Merge pull request #1792 from lbryio/delete-unused-scripts

delete unused scripts
This commit is contained in:
Jack Robison 2019-01-22 09:53:24 -05:00 committed by GitHub
commit 68d7a7014c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 0 additions and 1027 deletions

View file

@ -1,57 +0,0 @@
"""Decrypt a single blob"""
import argparse
import binascii
import logging
import os
import sys
from twisted.internet import defer
from twisted.internet import reactor
from lbrynet import conf
from lbrynet.cryptstream import CryptBlob
from lbrynet.blob import BlobFile
from lbrynet.core import log_support
log = logging.getLogger('decrypt_blob')
def main():
conf.initialize_settings()
parser = argparse.ArgumentParser()
parser.add_argument('blob_file')
parser.add_argument('hex_key')
parser.add_argument('hex_iv')
parser.add_argument('output')
args = parser.parse_args()
log_support.configure_console()
d = run(args)
reactor.run()
@defer.inlineCallbacks
def run(args):
try:
yield decrypt_blob(args.blob_file, args.hex_key, args.hex_iv, args.output)
except Exception:
log.exception('Failed to decrypt blob')
finally:
reactor.callLater(0, reactor.stop)
@defer.inlineCallbacks
def decrypt_blob(blob_file, key, iv, output):
filename = os.path.abspath(blob_file)
length = os.path.getsize(filename)
directory, blob_hash = os.path.split(filename)
blob = BlobFile(directory, blob_hash, length)
decryptor = CryptBlob.StreamBlobDecryptor(
blob, binascii.unhexlify(key), binascii.unhexlify(iv), length)
with open(output, 'w') as f:
yield decryptor.decrypt(f.write)
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,85 +0,0 @@
import curses
import time
import datetime
from jsonrpc.proxy import JSONRPCProxy
stdscr = curses.initscr()
api = JSONRPCProxy.from_url("http://localhost:5280")
def init_curses():
curses.noecho()
curses.cbreak()
stdscr.nodelay(1)
stdscr.keypad(1)
def teardown_curses():
curses.nocbreak()
stdscr.keypad(0)
curses.echo()
curses.endwin()
def refresh(node_index):
height, width = stdscr.getmaxyx()
node_ids = api.get_node_ids()
node_id = node_ids[node_index]
node_statuses = api.node_status()
running = node_statuses[node_id]
buckets = api.node_routing_table(node_id=node_id)
for y in range(height):
stdscr.addstr(y, 0, " " * (width - 1))
stdscr.addstr(0, 0, "node id: %s, running: %s (%i/%i running)" % (node_id, running, sum(node_statuses.values()), len(node_ids)))
stdscr.addstr(1, 0, "%i buckets, %i contacts" %
(len(buckets), sum([len(buckets[b]['contacts']) for b in buckets])))
y = 3
for i in sorted(buckets.keys()):
stdscr.addstr(y, 0, "bucket %s" % i)
y += 1
for h in sorted(buckets[i]['contacts'], key=lambda x: x['node_id'].decode('hex')):
stdscr.addstr(y, 0, '%s (%s:%i) failures: %i, last replied to us: %s, last requested from us: %s' %
(h['node_id'], h['address'], h['port'], h['failedRPCs'],
datetime.datetime.fromtimestamp(float(h['lastReplied'] or 0)),
datetime.datetime.fromtimestamp(float(h['lastRequested'] or 0))))
y += 1
y += 1
stdscr.addstr(y + 1, 0, str(time.time()))
stdscr.refresh()
return len(node_ids)
def do_main():
c = None
nodes = 1
node_index = 0
while c not in [ord('q'), ord('Q')]:
try:
nodes = refresh(node_index)
except:
pass
c = stdscr.getch()
if c == curses.KEY_LEFT:
node_index -= 1
node_index = max(node_index, 0)
elif c == curses.KEY_RIGHT:
node_index += 1
node_index = min(node_index, nodes - 1)
time.sleep(0.1)
def main():
try:
init_curses()
do_main()
finally:
teardown_curses()
if __name__ == "__main__":
main()

View file

@ -1,104 +0,0 @@
"""A simple script that attempts to directly download a single blob or stream from a given peer"""
import argparse
import logging
import sys
import tempfile
import time
import shutil
from pprint import pprint
from twisted.internet import asyncioreactor
asyncioreactor.install()
from twisted.internet import defer, threads, reactor
from lbrynet import conf, log_support
from lbrynet.p2p import Peer
from lbrynet.p2p.SinglePeerDownloader import SinglePeerDownloader
from lbrynet.p2p.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.p2p.BlobManager import DiskBlobManager
from lbrynet.extras.daemon.Components import f2d
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.wallet import LbryWalletManager
log = logging.getLogger()
def main(args=None):
conf.initialize_settings()
parser = argparse.ArgumentParser()
parser.add_argument('peer')
parser.add_argument('blob_hash')
parser.add_argument('--timeout', type=int, default=30)
args = parser.parse_args(args)
log_support.configure_console(level='DEBUG')
log_support.configure_twisted()
if ":" in str(args.peer):
host, port = str(args.peer).strip().split(":")
else:
host = args.peer
port = 3333
d = download_it(Peer.Peer(host, int(port)), args.timeout, args.blob_hash)
d.addErrback(log.exception)
d.addBoth(lambda _: reactor.callLater(0, reactor.stop))
reactor.run()
@defer.inlineCallbacks
def download_it(peer, timeout, blob_hash):
tmp_dir = yield threads.deferToThread(tempfile.mkdtemp)
storage = SQLiteStorage(tmp_dir, reactor)
yield storage.setup()
tmp_blob_manager = DiskBlobManager(tmp_dir, storage)
config = {'auto_connect': True}
config['wallet_dir'] = tempfile.mkdtemp()
config['use_keyring'] = False
config['blockchain_name'] = conf.settings['blockchain_name']
config['lbryum_servers'] = []
wallet = yield f2d(LbryWalletManager.from_lbrynet_config(config, storage))
downloader = SinglePeerDownloader()
downloader.setup(wallet)
try:
blob_downloaded = yield downloader.download_blob_from_peer(peer, timeout, blob_hash,
tmp_blob_manager)
if blob_downloaded:
log.info("SUCCESS!")
blob = yield tmp_blob_manager.get_blob(blob_hash)
pprint(blob)
if not blob.verified:
log.error("except that its not verified....")
else:
reader = BlobStreamDescriptorReader(blob)
info = None
for x in range(0, 3):
try:
info = yield reader.get_info()
except ValueError:
pass
if info:
break
# there's some kind of race condition where it sometimes doesn't write the blob to disk in time
time.sleep(0.1)
if info is not None:
pprint(info)
for content_blob in info['blobs']:
if 'blob_hash' in content_blob:
yield download_it(peer, timeout, content_blob['blob_hash'])
else:
log.error("Download failed")
finally:
yield tmp_blob_manager.stop()
yield threads.deferToThread(shutil.rmtree, tmp_dir)
defer.returnValue(True)
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,80 +0,0 @@
"""A test script that downloads blobs from a reflector server"""
import argparse
import itertools
import json
import random
import subprocess
import sys
def main():
parser = argparse.ArgumentParser()
parser.add_argument('reflector_ip')
parser.add_argument('--ssh-key')
parser.add_argument('--size', type=int, default=100)
parser.add_argument('--batch', type=int, default=10)
parser.add_argument('--timeout', type=int, default=30)
parser.add_argument('--hashes', help='file listing hashes in json')
args = parser.parse_args()
if args.hashes:
hashes = readHashes(args.hashes)
else:
hashes = getHashes(args.reflector_ip, args.ssh_key)
if len(hashes) > args.size:
selected_hashes = random.sample(hashes, args.size)
else:
print 'Only {} hashes are available'.format(hashes)
selected_hashes = hashes
successes = 0
for hashes in grouper(selected_hashes, args.batch):
hashes = filter(None, hashes)
successes += downloadHashes(args.reflector_ip, hashes, args.timeout)
print 'Downloaded {} / {}'.format(successes, len(selected_hashes))
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
args = [iter(iterable)] * n
return itertools.izip_longest(fillvalue=fillvalue, *args)
def readHashes(hash_file):
with open(hash_file) as f:
return json.load(f)
def getHashes(ip, key=None):
key = ['-i', key] if key else []
hashes = subprocess.check_output(['ssh'] + key +
['lbry@{}'.format(ip), '/opt/venvs/lbrynet/bin/lbrynet-cli', 'get_blob_hashes'])
return json.loads(hashes)
def downloadHashes(ip, blob_hashes, timeout=30):
processes = [
subprocess.Popen(
[
'python',
'download_blob_from_peer.py',
'--timeout', str(timeout), '{}:3333'.format(ip), blob_hash,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
for blob_hash in blob_hashes
]
for p, h in zip(processes, blob_hashes):
stdout, stderr = p.communicate()
print p.returncode, h
if p.returncode != 0:
print 'Failed to download', h
print stdout
print stderr
return sum(1 for p in processes if p.returncode == 0)
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,63 +0,0 @@
"""Encrypt a single file using the given key and iv"""
import argparse
import logging
import sys
from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import FileBodyProducer
from lbrynet import conf
from lbrynet.core import log_support
from lbrynet.extras.daemon.HashAnnouncer import DHTHashAnnouncer
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
log = logging.getLogger('decrypt_blob')
def main():
conf.initialize_settings()
parser = argparse.ArgumentParser()
parser.add_argument('filename')
parser.add_argument('hex_key')
parser.add_argument('hex_iv')
args = parser.parse_args()
log_support.configure_console(level='DEBUG')
run(args)
reactor.run()
@defer.inlineCallbacks
def run(args):
try:
yield encrypt_blob(args.filename, args.hex_key, args.hex_iv)
except Exception:
log.exception('Failed to encrypt blob')
finally:
reactor.callLater(0, reactor.stop)
@defer.inlineCallbacks
def encrypt_blob(filename, key, iv):
dummy_announcer = DummyHashAnnouncer()
manager = DiskBlobManager(dummy_announcer, '.', '.')
yield manager.setup()
creator = CryptStreamCreator(manager, filename, key, iv_generator(iv))
with open(filename, 'r') as infile:
producer = FileBodyProducer(infile, readSize=2**22)
yield producer.startProducing(creator)
yield creator.stop()
def iv_generator(iv):
iv = int(iv, 16)
while 1:
iv += 1
yield ("%016d" % iv)[-16:]
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,206 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Generate docs: python gen_api_docs.py
# See docs: pip install mkdocs; mkdocs serve
# Push docs: mkdocs build
import re
import inspect
import subprocess
import os
import sys
from lbrynet.daemon.Daemon import Daemon
import pip
installed_packages = [package.project_name for package in pip.get_installed_distributions()]
for package in ["mkdocs", "mkdocs-material"]:
if package not in installed_packages:
print "'" + package + "' is not installed"
sys.exit(1)
try:
from tabulate import tabulate
except ImportError:
raise ImportError("tabulate is not installed")
INDENT = " "
REQD_CMD_REGEX = r"\(.*?=<(?P<reqd>.*?)>\)"
OPT_CMD_REGEX = r"\[.*?=<(?P<opt>.*?)>\]"
CMD_REGEX = r"--.*?(?P<cmd>.*?)[=,\s,<]"
DOCS_BUILD_DIR = "docs_build" # must match mkdocs.yml
def _cli_tabulate_options(_options_docstr, method):
_option_list = []
for line in _options_docstr.splitlines():
if (line.strip().startswith("--")):
# separates command name and description
parts = line.split(":", 1)
# separates command type(in brackets) and description
new_parts = parts[1].lstrip().split(" ", 1)
else:
parts = [line]
# len will be 2 when there's cmd name and description
if len(parts) == 2:
_option_list.append([parts[0], ":", new_parts[0], new_parts[1]])
# len will be 1 when there's continuation of multiline description in the next line
# check `blob_announce`'s `stream_hash` command
elif len(parts) == 1:
_option_list.append([None, None, None, parts[0]])
else:
print "Error: Ill formatted doc string for {}".format(method)
print "Error causing line: {}".format(line)
# tabulate to make the options look pretty
_options_docstr_no_indent = tabulate(_option_list, missingval="", tablefmt="plain")
# Indent the options properly
_options_docstr = ""
for line in _options_docstr_no_indent.splitlines():
_options_docstr += INDENT + line + '\n'
return _options_docstr
def _api_tabulate_options(_options_docstr, method, reqd_matches, opt_matches):
_option_list = []
for line in _options_docstr.splitlines():
if (line.strip().startswith("--")):
# separates command name and description
parts = line.split(":", 1)
# checks whether the command is optional or required
# and remove the cli type formatting and convert to
# api style formatitng
match = re.findall(CMD_REGEX, parts[0])
if match[0] not in reqd_matches:
parts[0] = "'" + match[0] + "'"
else:
parts[0] = "'" + match[0] + "' (required)"
# separates command type(in brackets) and description
new_parts = parts[1].lstrip().split(" ", 1)
else:
parts = [line]
# len will be 2 when there's cmd name and description
if len(parts) == 2:
_option_list.append([parts[0], ":", new_parts[0], new_parts[1]])
# len will be 1 when there's continuation of multiline description in the next line
# check `blob_announce`'s `stream_hash` command
elif len(parts) == 1:
_option_list.append([None, None, None, parts[0]])
else:
print "Error: Ill formatted doc string for {}".format(method)
print "Error causing line: {}".format(line)
# tabulate to make the options look pretty
_options_docstr_no_indent = tabulate(_option_list, missingval="", tablefmt="plain")
# tabulate to make the options look pretty
_options_docstr = ""
for line in _options_docstr_no_indent.splitlines():
_options_docstr += INDENT + line + '\n'
return _options_docstr
def _cli_doc(obj):
docstr = (inspect.getdoc(obj) or '').strip()
try:
_usage_docstr, _docstr_after_options = docstr.split("Options:", 1)
_options_docstr, _returns_docstr = _docstr_after_options.split("Returns:", 1)
except(ValueError):
print "Error: Ill formatted doc string for {}".format(obj)
print "Please ensure that the docstring has all the three headings i.e. \"Usage:\""
print "\"Options:\" and \"Returns:\" exactly as specified, including the colon"
return "Error!"
try:
_options_docstr = _cli_tabulate_options(_options_docstr.strip(), obj)
except Exception as e:
print "Please make sure that the individual options are properly formatted"
print "It should be strictly of the format:"
print "--command_name=<command_name> : (type) desc"
print e.message
docstr = _usage_docstr + \
"\nOptions:\n" + \
_options_docstr + \
"\nReturns:" + \
_returns_docstr
return docstr
def _api_doc(obj):
docstr = (inspect.getdoc(obj) or '').strip()
try:
_desc, _docstr_after_desc = docstr.split("Usage:", 1)
_usage_docstr, _docstr_after_options = _docstr_after_desc.split("Options:", 1)
_options_docstr, _returns_docstr = _docstr_after_options.split("Returns:", 1)
except(ValueError):
print "Error: Ill formatted doc string for {}".format(obj)
print "Please ensure that the docstring has all the three headings i.e. \"Usage:\""
print "\"Options:\" and \"Returns:\" exactly as specified, including the colon"
return "Error!"
opt_matches = re.findall(OPT_CMD_REGEX, _usage_docstr)
reqd_matches = re.findall(REQD_CMD_REGEX, _usage_docstr)
try:
_options_docstr = _api_tabulate_options(_options_docstr.strip(), obj, reqd_matches, opt_matches)
except Exception as e:
print "Please make sure that the individual options are properly formatted"
print "It should be strictly of the format:"
print "--command_name=<command_name> : (type) desc"
print e.message
docstr = _desc + \
"Args:\n" + \
_options_docstr + \
"\nReturns:" + \
_returns_docstr
return docstr
def main():
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
build_dir = os.path.realpath(os.path.join(root_dir, DOCS_BUILD_DIR))
if not os.path.exists(build_dir):
os.makedirs(build_dir)
api_doc_path = os.path.join(build_dir, 'index.md')
cli_doc_path = os.path.join(build_dir, 'cli.md')
_api_docs = ''
_cli_docs = ''
for method_name in sorted(Daemon.callable_methods.keys()):
method = Daemon.callable_methods[method_name]
_api_docs += '## ' + method_name + "\n\n```text\n" + _api_doc(method) + "\n```\n\n"
_cli_docs += '## ' + method_name + "\n\n```text\n" + _cli_doc(method) + "\n```\n\n"
_api_docs = "# LBRY JSON-RPC API Documentation\n\n" + _api_docs
with open(api_doc_path, 'w+') as f:
f.write(_api_docs)
_cli_docs = "# LBRY JSON-RPC API Documentation\n\n" + _cli_docs
with open(cli_doc_path, 'w+') as f:
f.write(_cli_docs)
try:
subprocess.check_output("exec mkdocs build", cwd=root_dir, shell=True)
except subprocess.CalledProcessError as e:
print e.output
return 1
return 0
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,110 +0,0 @@
import argparse
import hashlib
import json
import subprocess
import sys
import base58
from lbryum import SimpleConfig, Network
from lbryum.wallet import WalletStorage, Wallet
from lbryum.commands import known_commands, Commands
from lbryum import lbrycrd
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--wallet', help='path to lbryum wallet')
args = parser.parse_args()
ensureCliIsOnPathAndServerIsRunning()
wallet = getWallet(args.wallet)
addresses = wallet.addresses(True)
for addr in addresses[:-1]:
printBalance(wallet, addr)
saveAddr(wallet, addr)
# on the last one, rescan. Don't rescan early for sake of efficiency
addr = addresses[-1]
printBalance(wallet, addr)
saveAddr(wallet, addr, "true")
def ensureCliIsOnPathAndServerIsRunning():
try:
output = subprocess.check_output(['lbrycrd-cli', 'getinfo'])
except OSError:
print 'Failed to run: lbrycrd-cli needs to be on the PATH'
sys.exit(1)
except subprocess.CalledProcessError:
print 'Failed to run: could not connect to the lbrycrd server.'
print 'Make sure it is running and able to be connected to.'
print 'One way to do this is to run:'
print ' lbrycrdd -server -printtoconsole'
sys.exit(1)
def validateAddress(addr):
raw_output = subprocess.check_output(
['lbrycrd-cli', 'validateaddress', addr])
output = json.loads(raw_output)
if not output['isvalid']:
raise Exception('Address {} is not valid'.format(addr))
if not output['ismine']:
raise Exception('Address {} is not yours'.format(addr))
def printBalance(wallet, addr):
balance = getBalance(wallet, addr)
print 'Importing private key for %s with balance %s' % (addr, balance)
def getBalance(wallet, addr):
return sum(wallet.get_addr_balance(addr))
def getWallet(path=None):
if not path:
config = SimpleConfig()
path = config.get_wallet_path()
storage = WalletStorage(path)
if not storage.file_exists:
print "Failed to run: No wallet to migrate"
sys.exit(1)
return Wallet(storage)
def saveAddr(wallet, addr, rescan="false"):
keys = wallet.get_private_key(addr, None)
assert len(keys) == 1, 'Address {} has {} keys. Expected 1'.format(addr, len(keys))
key = keys[0]
# copied from lbrycrd.regenerate_key
b = lbrycrd.ASecretToSecret(key)
pkey = b[0:32]
is_compressed = lbrycrd.is_compressed(key)
wif = pkeyToWif(pkey, is_compressed)
subprocess.check_call(
['lbrycrd-cli', 'importprivkey', wif, "", rescan])
validateAddress(addr)
def pkeyToWif(pkey, compressed):
# Follow https://en.bitcoin.it/wiki/Wallet_import_format
# to convert from a private key to the wallet import format
prefix = '\x1c'
wif = prefix + pkey
if compressed:
wif += '\x01'
intermediate_checksum = hashlib.sha256(wif).digest()
checksum = hashlib.sha256(intermediate_checksum).digest()
wif = wif + checksum[:4]
return base58.b58encode(wif)
def wifToPkey(wif):
pkey = base58.b58decode(wif)
return pkey[1:-4]
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,94 +0,0 @@
"""Reseed a file.
Given a file and a matching sd_blob,
re-chunk and encrypt the file, adding
the new blobs to the manager.
"""
import argparse
import binascii
import logging
import json
import os
import sys
from twisted.internet import defer
from twisted.internet import reactor
from twisted.protocols import basic
from lbrynet import conf
from lbrynet.core import BlobManager
from lbrynet.dht import hashannouncer
from lbrynet.core import log_support
from lbrynet.cryptstream import CryptStreamCreator
log = logging.getLogger('reseed_file')
def main():
conf.initialize_settings()
parser = argparse.ArgumentParser()
parser.add_argument('input_file')
parser.add_argument('sd_blob', help='a json file containing a key and the IVs')
args = parser.parse_args()
log_support.configure_console()
run(args)
reactor.run()
@defer.inlineCallbacks
def run(args):
try:
yield reseed_file(args.input_file, args.sd_blob)
except Exception as e:
log.exception('Failed to reseed')
finally:
reactor.stop()
@defer.inlineCallbacks
def reseed_file(input_file, sd_blob):
sd_blob = SdBlob.new_instance(sd_blob)
db_dir = conf.settings['data_dir']
blobfile_dir = os.path.join(db_dir, "blobfiles")
announcer = hashannouncer.DummyHashAnnouncer()
blob_manager = BlobManager.DiskBlobManager(announcer, blobfile_dir, db_dir)
yield blob_manager.setup()
creator = CryptStreamCreator.CryptStreamCreator(
blob_manager, None, sd_blob.key(), sd_blob.iv_generator())
file_sender = basic.FileSender()
with open(input_file) as f:
yield file_sender.beginFileTransfer(f, creator)
yield creator.stop()
for blob_info in sd_blob.blob_infos():
if 'blob_hash' not in blob_info:
# the last blob is always empty and without a hash
continue
blob = yield blob_manager.get_blob(blob_info['blob_hash'], True)
if not blob.verified:
print "Blob {} is not verified".format(blob)
class SdBlob(object):
def __init__(self, contents):
self.contents = contents
def key(self):
return binascii.unhexlify(self.contents['key'])
def iv_generator(self):
for blob_info in self.blob_infos():
yield binascii.unhexlify(blob_info['iv'])
def blob_infos(self):
return self.contents['blobs']
@classmethod
def new_instance(cls, filename):
with open(filename) as f:
return cls(json.load(f))
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,228 +0,0 @@
import struct
import json
import logging
import argparse
import hashlib
from copy import deepcopy
from urllib import urlopen
from twisted.internet.epollreactor import install as install_epoll
install_epoll()
from twisted.internet import reactor, defer
from twisted.web import resource
from twisted.web.server import Site
from lbrynet import conf
from lbrynet.dht import constants
from lbrynet.dht.node import Node
from lbrynet.dht.error import TransportNotConnected
from lbrynet.core.log_support import configure_console, configure_twisted
from lbrynet.daemon.auth.server import AuthJSONRPCServer
# configure_twisted()
conf.initialize_settings()
configure_console()
lbrynet_handler = logging.getLogger("lbrynet").handlers[0]
log = logging.getLogger("dht router")
log.addHandler(lbrynet_handler)
log.setLevel(logging.INFO)
def node_id_supplier(seed="jack.lbry.tech"): # simple deterministic node id generator
h = hashlib.sha384()
h.update(seed)
while True:
next_id = h.digest()
yield next_id
h = hashlib.sha384()
h.update(seed)
h.update(next_id)
def get_external_ip():
response = json.loads(urlopen("https://api.lbry.io/ip").read())
if not response['success']:
raise ValueError("failed to get external ip")
return response['data']['ip']
def format_contact(contact):
return {
"node_id": contact.id.encode('hex'),
"address": contact.address,
"nodePort": contact.port,
"lastReplied": contact.lastReplied,
"lastRequested": contact.lastRequested,
"failedRPCs": contact.failedRPCs,
"lastFailed": None if not contact.failures else contact.failures[-1]
}
def format_datastore(node):
datastore = deepcopy(node._dataStore._dict)
result = {}
for key, values in datastore.items():
contacts = []
for (contact, value, last_published, originally_published, original_publisher_id) in values:
contact_dict = format_contact(contact)
contact_dict['peerPort'] = struct.unpack('>H', value[4:6])[0]
contact_dict['lastPublished'] = last_published
contact_dict['originallyPublished'] = originally_published
contact_dict['originalPublisherID'] = original_publisher_id.encode('hex')
contacts.append(contact_dict)
result[key.encode('hex')] = contacts
return result
class MultiSeedRPCServer(AuthJSONRPCServer):
def __init__(self, starting_node_port, nodes, rpc_port):
AuthJSONRPCServer.__init__(self, False)
self.port = None
self.rpc_port = rpc_port
self.external_ip = get_external_ip()
node_id_gen = node_id_supplier()
self._nodes = [Node(node_id=next(node_id_gen), udpPort=starting_node_port+i, externalIP=self.external_ip)
for i in range(nodes)]
self._own_addresses = [(self.external_ip, starting_node_port+i) for i in range(nodes)]
reactor.addSystemEventTrigger('after', 'startup', self.start)
@defer.inlineCallbacks
def start(self):
self.announced_startup = True
root = resource.Resource()
root.putChild('', self)
self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost')
log.info("starting %i nodes on %s, rpc available on localhost:%i", len(self._nodes), self.external_ip, self.rpc_port)
for node in self._nodes:
node.start_listening()
yield node._protocol._listening
for node1 in self._nodes:
for node2 in self._nodes:
if node1 is node2:
continue
try:
yield node1.addContact(node1.contact_manager.make_contact(node2.node_id, node2.externalIP,
node2.port, node1._protocol))
except TransportNotConnected:
pass
node1.safe_start_looping_call(node1._change_token_lc, constants.tokenSecretChangeInterval)
node1.safe_start_looping_call(node1._refresh_node_lc, constants.checkRefreshInterval)
node1._join_deferred = defer.succeed(True)
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
log.info("finished bootstrapping the network, running %i nodes", len(self._nodes))
@defer.inlineCallbacks
def stop(self):
yield self.port.stopListening()
yield defer.DeferredList([node.stop() for node in self._nodes])
def jsonrpc_get_node_ids(self):
return defer.succeed([node.node_id.encode('hex') for node in self._nodes])
def jsonrpc_node_datastore(self, node_id):
for node in self._nodes:
if node.node_id == node_id.decode('hex'):
return defer.succeed(format_datastore(node))
def jsonrpc_get_nodes_who_stored(self, blob_hash):
storing_nodes = {}
for node in self._nodes:
datastore = format_datastore(node)
if blob_hash in datastore:
storing_nodes[node.node_id.encode('hex')] = datastore[blob_hash]
return defer.succeed(storing_nodes)
def jsonrpc_node_routing_table(self, node_id):
def format_bucket(bucket):
return {
"contacts": [format_contact(contact) for contact in bucket._contacts],
"lastAccessed": bucket.lastAccessed
}
def format_routing(node):
return {
i: format_bucket(bucket) for i, bucket in enumerate(node._routingTable._buckets)
}
for node in self._nodes:
if node.node_id == node_id.decode('hex'):
return defer.succeed(format_routing(node))
def jsonrpc_restart_node(self, node_id):
for node in self._nodes:
if node.node_id == node_id.decode('hex'):
d = node.stop()
d.addCallback(lambda _: node.start(self._own_addresses))
return d
@defer.inlineCallbacks
def jsonrpc_local_node_rpc(self, from_node, query, args=()):
def format_result(response):
if isinstance(response, list):
return [[node_id.encode('hex'), address, port] for (node_id, address, port) in response]
if isinstance(response, dict):
return {'token': response['token'].encode('hex'), 'contacts': format_result(response['contacts'])}
return response
for node in self._nodes:
if node.node_id == from_node.decode('hex'):
fn = getattr(node, query)
self_contact = node.contact_manager.make_contact(node.node_id, node.externalIP, node.port, node._protocol)
if args:
args = (str(arg) if isinstance(arg, (str, unicode)) else int(arg) for arg in args)
result = yield fn(self_contact, *args)
else:
result = yield fn()
# print "result: %s" % result
defer.returnValue(format_result(result))
@defer.inlineCallbacks
def jsonrpc_node_rpc(self, from_node, to_node, query, args=()):
def format_result(response):
if isinstance(response, list):
return [[node_id.encode('hex'), address, port] for (node_id, address, port) in response]
if isinstance(response, dict):
return {'token': response['token'].encode('hex'), 'contacts': format_result(response['contacts'])}
return response
for node in self._nodes:
if node.node_id == from_node.decode('hex'):
remote = node._routingTable.getContact(to_node.decode('hex'))
fn = getattr(remote, query)
if args:
args = (str(arg).decode('hex') for arg in args)
result = yield fn(*args)
else:
result = yield fn()
defer.returnValue(format_result(result))
@defer.inlineCallbacks
def jsonrpc_get_nodes_who_know(self, ip_address):
nodes = []
for node_id in [n.node_id.encode('hex') for n in self._nodes]:
routing_info = yield self.jsonrpc_node_routing_table(node_id=node_id)
for index, bucket in routing_info.items():
if ip_address in map(lambda c: c['address'], bucket['contacts']):
nodes.append(node_id)
break
defer.returnValue(nodes)
def jsonrpc_node_status(self):
return defer.succeed({
node.node_id.encode('hex'): node._join_deferred is not None and node._join_deferred.called
for node in self._nodes
})
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--rpc_port', default=5280)
parser.add_argument('--starting_port', default=4455)
parser.add_argument('--nodes', default=32)
args = parser.parse_args()
MultiSeedRPCServer(int(args.starting_port), int(args.nodes), int(args.rpc_port))
reactor.run()
if __name__ == "__main__":
main()