lbry-sdk/lbry/wallet/orchstr8/node.py

439 lines
15 KiB
Python
Raw Normal View History

import os
2019-12-12 04:07:33 +01:00
import json
import shutil
import asyncio
import zipfile
import tarfile
import logging
import tempfile
import subprocess
import importlib
from binascii import hexlify
2018-11-04 00:45:28 +01:00
from typing import Type, Optional
import urllib.request
2020-01-03 04:18:49 +01:00
import lbry
2020-04-11 23:27:41 +02:00
from lbry.db import Database
from lbry.wallet.server.server import Server
from lbry.wallet.server.env import Env
2020-01-03 04:18:49 +01:00
from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent
log = logging.getLogger(__name__)
def get_spvserver_from_ledger(ledger_module):
spvserver_path, regtest_class_name = ledger_module.__spvserver__.rsplit('.', 1)
spvserver_module = importlib.import_module(spvserver_path)
return getattr(spvserver_module, regtest_class_name)
def get_blockchain_node_from_ledger(ledger_module):
return BlockchainNode(
ledger_module.__node_url__,
os.path.join(ledger_module.__node_bin__, ledger_module.__node_daemon__),
os.path.join(ledger_module.__node_bin__, ledger_module.__node_cli__)
)
class Conductor:
2020-01-03 04:18:49 +01:00
def __init__(self, seed=None):
self.manager_module = WalletManager
self.spv_module = get_spvserver_from_ledger(lbry.wallet)
2020-01-03 04:18:49 +01:00
self.blockchain_node = get_blockchain_node_from_ledger(lbry.wallet)
self.spv_node = SPVNode(self.spv_module)
self.wallet_node = WalletNode(
2020-01-03 04:18:49 +01:00
self.manager_module, RegTestLedger, default_seed=seed
)
self.blockchain_started = False
self.spv_started = False
self.wallet_started = False
self.log = log.getChild('conductor')
async def start_blockchain(self):
if not self.blockchain_started:
2020-04-01 19:52:14 +02:00
asyncio.create_task(self.blockchain_node.start())
await self.blockchain_node.running.wait()
await self.blockchain_node.generate(200)
self.blockchain_started = True
async def stop_blockchain(self):
if self.blockchain_started:
await self.blockchain_node.stop(cleanup=True)
self.blockchain_started = False
async def start_spv(self):
if not self.spv_started:
2019-01-14 04:38:36 +01:00
await self.spv_node.start(self.blockchain_node)
self.spv_started = True
async def stop_spv(self):
if self.spv_started:
await self.spv_node.stop(cleanup=True)
self.spv_started = False
async def start_wallet(self):
if not self.wallet_started:
2019-01-14 04:38:36 +01:00
await self.wallet_node.start(self.spv_node)
self.wallet_started = True
async def stop_wallet(self):
if self.wallet_started:
await self.wallet_node.stop(cleanup=True)
self.wallet_started = False
async def start(self):
await self.start_blockchain()
await self.start_spv()
await self.start_wallet()
async def stop(self):
all_the_stops = [
self.stop_wallet,
self.stop_spv,
self.stop_blockchain
]
for stop in all_the_stops:
try:
await stop()
except Exception as e:
log.exception('Exception raised while stopping services:', exc_info=e)
class WalletNode:
2020-01-03 04:18:49 +01:00
def __init__(self, manager_class: Type[WalletManager], ledger_class: Type[Ledger],
verbose: bool = False, port: int = 5280, default_seed: str = None) -> None:
self.manager_class = manager_class
self.ledger_class = ledger_class
self.verbose = verbose
2020-01-03 04:18:49 +01:00
self.manager: Optional[WalletManager] = None
self.ledger: Optional[Ledger] = None
2018-11-04 00:45:28 +01:00
self.wallet: Optional[Wallet] = None
2020-01-03 04:18:49 +01:00
self.account: Optional[Account] = None
2018-11-04 00:45:28 +01:00
self.data_path: Optional[str] = None
2019-01-14 04:38:36 +01:00
self.port = port
self.default_seed = default_seed
2019-02-11 05:11:47 +01:00
async def start(self, spv_node: 'SPVNode', seed=None, connect=True):
self.data_path = tempfile.mkdtemp()
wallets_dir = os.path.join(self.data_path, 'wallets')
os.mkdir(wallets_dir)
wallet_file_name = os.path.join(wallets_dir, 'my_wallet.json')
2018-11-04 00:45:28 +01:00
with open(wallet_file_name, 'w') as wallet_file:
wallet_file.write('{"version": 1, "accounts": []}\n')
2020-04-11 23:27:41 +02:00
db_driver = os.environ.get('TEST_DB', 'sqlite')
if db_driver == 'sqlite':
db = 'sqlite:///'+os.path.join(self.data_path, self.ledger_class.get_id(), 'blockchain.db')
elif db_driver == 'postgres':
2020-04-12 17:06:05 +02:00
db_connection = 'postgres:postgres@localhost:5432'
2020-04-11 23:27:41 +02:00
db_name = f'lbry_test_{self.port}'
2020-04-12 17:06:05 +02:00
meta_db = Database(f'postgresql+psycopg2://{db_connection}/postgres')
2020-04-11 23:27:41 +02:00
await meta_db.drop(db_name)
await meta_db.create(db_name)
2020-04-12 17:06:05 +02:00
db = f'postgresql+psycopg2://{db_connection}/{db_name}'
2020-04-11 23:27:41 +02:00
else:
raise RuntimeError(f"Unsupported database driver: {db_driver}")
self.manager = self.manager_class.from_config({
'ledgers': {
self.ledger_class.get_id(): {
2019-01-14 04:38:36 +01:00
'api_port': self.port,
'default_servers': [(spv_node.hostname, spv_node.port)],
2020-04-11 23:27:41 +02:00
'data_path': self.data_path,
'db': Database(db)
}
},
'wallets': [wallet_file_name]
})
self.ledger = self.manager.ledgers[self.ledger_class]
self.wallet = self.manager.default_wallet
2019-10-13 01:50:12 +02:00
if not self.wallet:
raise ValueError('Wallet is required.')
if seed or self.default_seed:
2020-01-03 04:18:49 +01:00
Account.from_dict(
self.ledger, self.wallet, {'seed': seed or self.default_seed}
2018-11-19 04:54:00 +01:00
)
2019-01-14 04:44:46 +01:00
else:
2019-10-13 01:50:12 +02:00
self.wallet.generate_account(self.ledger)
self.account = self.wallet.default_account
2019-02-11 05:11:47 +01:00
if connect:
await self.manager.start()
async def stop(self, cleanup=True):
try:
await self.manager.stop()
finally:
cleanup and self.cleanup()
def cleanup(self):
shutil.rmtree(self.data_path, ignore_errors=True)
class SPVNode:
def __init__(self, coin_class, node_number=1):
self.coin_class = coin_class
self.controller = None
self.data_path = None
2018-11-04 00:45:28 +01:00
self.server = None
2019-01-14 04:38:36 +01:00
self.hostname = 'localhost'
self.port = 50001 + node_number # avoid conflict with default daemon
2019-07-16 11:23:44 +02:00
self.session_timeout = 600
self.rpc_port = '0' # disabled by default
2019-11-05 08:02:05 +01:00
async def start(self, blockchain_node: 'BlockchainNode', extraconf=None):
self.data_path = tempfile.mkdtemp()
conf = {
2020-01-12 04:48:46 +01:00
'DESCRIPTION': '',
'PAYMENT_ADDRESS': '',
'DAILY_FEE': '0',
'DB_DIRECTORY': self.data_path,
2019-01-14 04:38:36 +01:00
'DAEMON_URL': blockchain_node.rpc_url,
'REORG_LIMIT': '100',
'HOST': self.hostname,
2019-07-11 05:07:58 +02:00
'TCP_PORT': str(self.port),
2019-07-16 11:23:44 +02:00
'SESSION_TIMEOUT': str(self.session_timeout),
'MAX_QUERY_WORKERS': '0',
'INDIVIDUAL_TAG_INDEXES': '',
'RPC_PORT': self.rpc_port
}
2019-11-05 08:02:05 +01:00
if extraconf:
conf.update(extraconf)
2019-01-14 04:38:36 +01:00
# TODO: don't use os.environ
os.environ.update(conf)
self.server = Server(Env(self.coin_class))
2018-12-07 05:13:08 +01:00
self.server.mempool.refresh_secs = self.server.bp.prefetcher.polling_delay = 0.5
await self.server.start()
async def stop(self, cleanup=True):
try:
2018-12-15 04:31:01 +01:00
await self.server.stop()
finally:
cleanup and self.cleanup()
def cleanup(self):
shutil.rmtree(self.data_path, ignore_errors=True)
class BlockchainProcess(asyncio.SubprocessProtocol):
IGNORE_OUTPUT = [
b'keypool keep',
b'keypool reserve',
b'keypool return',
]
def __init__(self):
self.ready = asyncio.Event()
self.stopped = asyncio.Event()
self.log = log.getChild('blockchain')
def pipe_data_received(self, fd, data):
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
if b'Error:' in data:
2019-02-11 19:01:07 +01:00
self.log.error(data.decode())
else:
2019-02-11 19:01:07 +01:00
self.log.info(data.decode())
if b'Error:' in data:
self.ready.set()
2019-02-11 19:01:07 +01:00
raise SystemError(data.decode())
2019-02-28 18:00:44 +01:00
if b'Done loading' in data:
self.ready.set()
def process_exited(self):
self.stopped.set()
2020-02-07 21:29:27 +01:00
self.ready.set()
class BlockchainNode:
2019-12-12 04:07:33 +01:00
P2SH_SEGWIT_ADDRESS = "p2sh-segwit"
BECH32_ADDRESS = "bech32"
2020-01-03 04:18:49 +01:00
def __init__(self, url, daemon, cli):
self.latest_release_url = url
self.project_dir = os.path.dirname(os.path.dirname(__file__))
self.bin_dir = os.path.join(self.project_dir, 'bin')
self.daemon_bin = os.path.join(self.bin_dir, daemon)
self.cli_bin = os.path.join(self.bin_dir, cli)
self.log = log.getChild('blockchain')
self.data_path = None
self.protocol = None
self.transport = None
2019-12-20 17:04:21 +01:00
self.block_expected = 0
self.hostname = 'localhost'
self.peerport = 9246 + 2 # avoid conflict with default peer port
self.rpcport = 9245 + 2 # avoid conflict with default rpc port
2019-01-14 04:38:36 +01:00
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
2020-04-01 19:52:14 +02:00
self.stopped = False
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.running = asyncio.Event()
2019-01-14 04:38:36 +01:00
@property
def rpc_url(self):
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
def is_expected_block(self, e: BlockHeightEvent):
2019-12-20 17:04:21 +01:00
return self.block_expected == e.height
@property
def exists(self):
return (
os.path.exists(self.cli_bin) and
os.path.exists(self.daemon_bin)
)
def download(self):
downloaded_file = os.path.join(
self.bin_dir,
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
)
if not os.path.exists(self.bin_dir):
os.mkdir(self.bin_dir)
if not os.path.exists(downloaded_file):
self.log.info('Downloading: %s', self.latest_release_url)
with urllib.request.urlopen(self.latest_release_url) as response:
with open(downloaded_file, 'wb') as out_file:
shutil.copyfileobj(response, out_file)
self.log.info('Extracting: %s', downloaded_file)
if downloaded_file.endswith('.zip'):
2018-11-04 00:45:28 +01:00
with zipfile.ZipFile(downloaded_file) as dotzip:
dotzip.extractall(self.bin_dir)
# zipfile bug https://bugs.python.org/issue15795
os.chmod(self.cli_bin, 0o755)
os.chmod(self.daemon_bin, 0o755)
elif downloaded_file.endswith('.tar.gz'):
with tarfile.open(downloaded_file) as tar:
tar.extractall(self.bin_dir)
return self.exists
def ensure(self):
return self.exists or self.download()
async def start(self):
assert self.ensure()
self.data_path = tempfile.mkdtemp()
loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop)
command = [
self.daemon_bin,
f'-datadir={self.data_path}', '-printtoconsole', '-regtest', '-server', '-txindex',
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}',
f'-port={self.peerport}'
]
self.log.info(' '.join(command))
2020-04-01 19:52:14 +02:00
while not self.stopped:
if self.running.is_set():
await asyncio.sleep(1)
continue
await self.restart_ready.wait()
try:
self.transport, self.protocol = await loop.subprocess_exec(
BlockchainProcess, *command
)
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.running.set()
except asyncio.CancelledError:
self.running.clear()
raise
2020-04-01 21:11:45 +02:00
except Exception as e:
2020-04-01 19:52:14 +02:00
self.running.clear()
2020-04-01 21:11:45 +02:00
log.exception('failed to start lbrycrdd', exc_info=e)
async def stop(self, cleanup=True):
2020-04-01 19:52:14 +02:00
self.stopped = True
try:
self.transport.terminate()
await self.protocol.stopped.wait()
2018-12-05 07:13:31 +01:00
self.transport.close()
finally:
if cleanup:
self.cleanup()
2020-04-01 19:52:14 +02:00
async def clear_mempool(self):
self.restart_ready.clear()
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
self.running.clear()
os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
self.restart_ready.set()
await self.running.wait()
def cleanup(self):
shutil.rmtree(self.data_path, ignore_errors=True)
async def _cli_cmnd(self, *args):
cmnd_args = [
self.cli_bin, f'-datadir={self.data_path}', '-regtest',
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}'
] + list(args)
self.log.info(' '.join(cmnd_args))
loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop)
process = await asyncio.create_subprocess_exec(
*cmnd_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
2018-11-04 00:45:28 +01:00
out, _ = await process.communicate()
2019-12-31 23:47:24 +01:00
result = out.decode().strip()
self.log.info(result)
if result.startswith('error code'):
raise Exception(result)
return result
def generate(self, blocks):
2019-12-20 17:04:21 +01:00
self.block_expected += blocks
return self._cli_cmnd('generate', str(blocks))
2019-06-18 23:50:41 +02:00
def invalidate_block(self, blockhash):
2018-11-04 00:45:28 +01:00
return self._cli_cmnd('invalidateblock', blockhash)
2019-06-18 23:50:41 +02:00
def get_block_hash(self, block):
return self._cli_cmnd('getblockhash', str(block))
2020-04-01 21:26:35 +02:00
def sendrawtransaction(self, tx):
return self._cli_cmnd('sendrawtransaction', tx)
2020-04-01 19:52:14 +02:00
async def get_block(self, block_hash):
return json.loads(await self._cli_cmnd('getblock', block_hash, '1'))
def get_raw_change_address(self):
return self._cli_cmnd('getrawchangeaddress')
2019-12-12 15:04:13 +01:00
def get_new_address(self, address_type):
return self._cli_cmnd('getnewaddress', "", address_type)
2019-12-12 04:07:33 +01:00
async def get_balance(self):
return float(await self._cli_cmnd('getbalance'))
2018-11-04 00:45:28 +01:00
def send_to_address(self, address, amount):
return self._cli_cmnd('sendtoaddress', address, str(amount))
def send_raw_transaction(self, tx):
return self._cli_cmnd('sendrawtransaction', tx.decode())
2019-12-12 04:07:33 +01:00
def create_raw_transaction(self, inputs, outputs):
return self._cli_cmnd('createrawtransaction', json.dumps(inputs), json.dumps(outputs))
async def sign_raw_transaction_with_wallet(self, tx):
return json.loads(await self._cli_cmnd('signrawtransactionwithwallet', tx))['hex'].encode()
def decode_raw_transaction(self, tx):
return self._cli_cmnd('decoderawtransaction', hexlify(tx.raw).decode())
def get_raw_transaction(self, txid):
return self._cli_cmnd('getrawtransaction', txid, '1')