Compare commits

...
Sign in to create a new pull request.

14 commits

Author SHA1 Message Date
Roy Lee
09159fbd03 update lbcd and lbcwallet veresion 2021-11-30 23:42:22 -08:00
Brannon King
c27d9c50bd fixing a variety of broken tests, updated getclaimbyid usage 2021-11-26 11:45:40 -05:00
Roy Lee
a719c06615 test: lbcd don't have a dedicated mempool.dat 2021-11-15 12:41:29 -08:00
Roy Lee
79b80d9bbf test: update lbcd to avoid txn being rejected due to munimum fee 2021-11-15 12:41:29 -08:00
Roy Lee
f2fbf23886 HACK: temporary hack to move things along 2021-11-15 12:41:29 -08:00
Roy Lee
caa1b44492 add a rocksdb setup sanity check 2021-11-15 12:41:29 -08:00
Roy Lee
294da1b5aa test: migrate from lbrycrd to lbcd/lbcwallet 2021-11-15 12:41:29 -08:00
Roy Lee
fe06df5ec9 test: support generatetoaddress RPC 2021-11-15 12:41:29 -08:00
Roy Lee
ea6b22f8ef test: support walletpassphrase RPC 2021-11-15 12:41:29 -08:00
Roy Lee
85a96e738c test: getnewaddress RPC takes account name 2021-11-15 12:41:29 -08:00
Roy Lee
89db9f143c test: update some RPC arguments to int type 2021-11-15 12:41:29 -08:00
Roy Lee
56d42cc4e4 test: setup loggers first 2021-11-15 12:41:29 -08:00
Roy Lee
73a09e30d4 Update log level from WARN to INFO 2021-11-15 12:41:29 -08:00
Jack Robison
3f1258ee5d
use rocksdb instead of leveldb
-dont use block processor directly from session manager
2021-11-15 14:27:58 -05:00
20 changed files with 582 additions and 285 deletions

View file

@ -15,7 +15,6 @@ RUN apt-get update && \
build-essential \
automake libtool \
pkg-config \
libleveldb-dev \
python3.7 \
python3-dev \
python3-pip \

View file

@ -18,7 +18,7 @@ from lbry.wallet import WalletManager, Wallet, Ledger, Account, Transaction
from lbry.conf import Config
from lbry.wallet.util import satoshis_to_coins
from lbry.wallet.orchstr8 import Conductor
from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode
from lbry.wallet.orchstr8.node import LBCWalletNode, WalletNode, HubNode
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
from lbry.extras.daemon.components import Component, WalletComponent
@ -222,7 +222,7 @@ class IntegrationTestCase(AsyncioTestCase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.conductor: Optional[Conductor] = None
self.blockchain: Optional[BlockchainNode] = None
self.blockchain: Optional[LBCWalletNode] = None
self.hub: Optional[HubNode] = None
self.wallet_node: Optional[WalletNode] = None
self.manager: Optional[WalletManager] = None
@ -232,15 +232,17 @@ class IntegrationTestCase(AsyncioTestCase):
async def asyncSetUp(self):
self.conductor = Conductor(seed=self.SEED)
await self.conductor.start_blockchain()
self.addCleanup(self.conductor.stop_blockchain)
await self.conductor.start_lbcd()
self.addCleanup(self.conductor.stop_lbcd)
await self.conductor.start_lbcwallet()
self.addCleanup(self.conductor.stop_lbcwallet)
await self.conductor.start_spv()
self.addCleanup(self.conductor.stop_spv)
await self.conductor.start_wallet()
self.addCleanup(self.conductor.stop_wallet)
await self.conductor.start_hub()
self.addCleanup(self.conductor.stop_hub)
self.blockchain = self.conductor.blockchain_node
self.blockchain = self.conductor.lbcwallet_node
self.hub = self.conductor.hub_node
self.wallet_node = self.conductor.wallet_node
self.manager = self.wallet_node.manager
@ -320,7 +322,7 @@ class ExchangeRateManagerComponent(Component):
class CommandTestCase(IntegrationTestCase):
VERBOSITY = logging.WARN
VERBOSITY = logging.INFO
blob_lru_cache_size = 0
def __init__(self, *args, **kwargs):
@ -337,13 +339,14 @@ class CommandTestCase(IntegrationTestCase):
self.skip_libtorrent = True
async def asyncSetUp(self):
await super().asyncSetUp()
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
await super().asyncSetUp()
self.daemon = await self.add_daemon(self.wallet_node)
await self.account.ensure_address_gap()

View file

@ -1,8 +1,13 @@
__node_daemon__ = 'lbrycrdd'
__node_cli__ = 'lbrycrd-cli'
__node_bin__ = ''
__node_url__ = (
'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.6/lbrycrd-linux-1746.zip'
__lbcd__ = 'lbcd'
__lbcctl__ = 'lbcctl'
__lbcwallet__ = 'lbcwallet'
__lbcd_url__ = (
'https://github.com/lbryio/lbcd/releases/download/' +
'v0.22.100-beta-rc4/lbcd_0.22.100-beta-rc4_TARGET_PLATFORM.tar.gz'
)
__lbcwallet_url__ = (
'https://github.com/lbryio/lbcwallet/releases/download/' +
'v0.12.100-alpha-rc2/lbcwallet_0.12.100-alpha-rc2_TARGET_PLATFORM.tar.gz'
)
__spvserver__ = 'lbry.wallet.server.coin.LBCRegTest'

View file

@ -5,7 +5,9 @@ import aiohttp
from lbry import wallet
from lbry.wallet.orchstr8.node import (
Conductor, get_blockchain_node_from_ledger
Conductor,
get_lbcd_node_from_ledger,
get_lbcwallet_node_from_ledger
)
from lbry.wallet.orchstr8.service import ConductorService
@ -16,10 +18,11 @@ def get_argument_parser():
)
subparsers = parser.add_subparsers(dest='command', help='sub-command help')
subparsers.add_parser("download", help="Download blockchain node binary.")
subparsers.add_parser("download", help="Download lbcd and lbcwallet node binaries.")
start = subparsers.add_parser("start", help="Start orchstr8 service.")
start.add_argument("--blockchain", help="Hostname to start blockchain node.")
start.add_argument("--lbcd", help="Hostname to start lbcd node.")
start.add_argument("--lbcwallet", help="Hostname to start lbcwallet node.")
start.add_argument("--spv", help="Hostname to start SPV server.")
start.add_argument("--wallet", help="Hostname to start wallet daemon.")
@ -47,7 +50,8 @@ def main():
if command == 'download':
logging.getLogger('blockchain').setLevel(logging.INFO)
get_blockchain_node_from_ledger(wallet).ensure()
get_lbcd_node_from_ledger(wallet).ensure()
get_lbcwallet_node_from_ledger(wallet).ensure()
elif command == 'generate':
loop.run_until_complete(run_remote_command(
@ -57,9 +61,12 @@ def main():
elif command == 'start':
conductor = Conductor()
if getattr(args, 'blockchain', False):
conductor.blockchain_node.hostname = args.blockchain
loop.run_until_complete(conductor.start_blockchain())
if getattr(args, 'lbcd', False):
conductor.lbcd_node.hostname = args.lbcd
loop.run_until_complete(conductor.start_lbcd())
if getattr(args, 'lbcwallet', False):
conductor.lbcwallet_node.hostname = args.lbcwallet
loop.run_until_complete(conductor.start_lbcwallet())
if getattr(args, 'spv', False):
conductor.spv_node.hostname = args.spv
loop.run_until_complete(conductor.start_spv())

View file

@ -8,6 +8,7 @@ import logging
import tempfile
import subprocess
import importlib
import platform
from distutils.util import strtobool
from binascii import hexlify
@ -31,11 +32,18 @@ def get_spvserver_from_ledger(ledger_module):
return getattr(spvserver_module, regtest_class_name)
def get_blockchain_node_from_ledger(ledger_module):
return BlockchainNode(
ledger_module.__node_url__,
os.path.join(ledger_module.__node_bin__, ledger_module.__node_daemon__),
os.path.join(ledger_module.__node_bin__, ledger_module.__node_cli__)
def get_lbcd_node_from_ledger(ledger_module):
return LBCDNode(
ledger_module.__lbcd_url__,
ledger_module.__lbcd__,
ledger_module.__lbcctl__
)
def get_lbcwallet_node_from_ledger(ledger_module):
return LBCWalletNode(
ledger_module.__lbcwallet_url__,
ledger_module.__lbcwallet__,
ledger_module.__lbcctl__
)
@ -45,51 +53,52 @@ class Conductor:
self.manager_module = WalletManager
self.spv_module = get_spvserver_from_ledger(lbry.wallet)
self.blockchain_node = get_blockchain_node_from_ledger(lbry.wallet)
self.lbcd_node = get_lbcd_node_from_ledger(lbry.wallet)
self.lbcwallet_node = get_lbcwallet_node_from_ledger(lbry.wallet)
self.spv_node = SPVNode(self.spv_module)
self.wallet_node = WalletNode(
self.manager_module, RegTestLedger, default_seed=seed
)
self.hub_node = HubNode(__hub_url__, "hub", self.spv_node)
self.blockchain_started = False
self.lbcd_started = False
self.lbcwallet_started = False
self.spv_started = False
self.wallet_started = False
self.hub_started = False
self.log = log.getChild('conductor')
async def start_blockchain(self):
if not self.blockchain_started:
asyncio.create_task(self.blockchain_node.start())
await self.blockchain_node.running.wait()
await self.blockchain_node.generate(200)
self.blockchain_started = True
async def start_lbcd(self):
if not self.lbcd_started:
asyncio.create_task(self.lbcd_node.start())
await self.lbcd_node.running.wait()
self.lbcd_started = True
async def stop_blockchain(self):
if self.blockchain_started:
await self.blockchain_node.stop(cleanup=True)
self.blockchain_started = False
async def stop_lbcd(self, cleanup=True):
if self.lbcd_started:
await self.lbcd_node.stop(cleanup)
self.lbcd_started = False
async def start_hub(self):
if not self.hub_started:
asyncio.create_task(self.hub_node.start())
await self.blockchain_node.running.wait()
await self.lbcwallet_node.running.wait()
self.hub_started = True
async def stop_hub(self):
async def stop_hub(self, cleanup=True):
if self.hub_started:
await self.hub_node.stop(cleanup=True)
await self.hub_node.stop(cleanup)
self.hub_started = False
async def start_spv(self):
if not self.spv_started:
await self.spv_node.start(self.blockchain_node)
await self.spv_node.start(self.lbcwallet_node)
self.spv_started = True
async def stop_spv(self):
async def stop_spv(self, cleanup=True):
if self.spv_started:
await self.spv_node.stop(cleanup=True)
await self.spv_node.stop(cleanup)
self.spv_started = False
async def start_wallet(self):
@ -97,13 +106,31 @@ class Conductor:
await self.wallet_node.start(self.spv_node)
self.wallet_started = True
async def stop_wallet(self):
async def stop_wallet(self, cleanup=True):
if self.wallet_started:
await self.wallet_node.stop(cleanup=True)
await self.wallet_node.stop(cleanup)
self.wallet_started = False
async def start_lbcwallet(self, clean=True):
if not self.lbcwallet_started:
asyncio.create_task(self.lbcwallet_node.start())
await self.lbcwallet_node.running.wait()
if clean:
mining_addr = await self.lbcwallet_node.get_new_address()
self.lbcwallet_node.mining_addr = mining_addr
await self.lbcwallet_node.generate(200)
# unlock the wallet for the next 1 hour
await self.lbcwallet_node.wallet_passphrase("password", 3600)
self.lbcwallet_started = True
async def stop_lbcwallet(self, cleanup=True):
if self.lbcwallet_started:
await self.lbcwallet_node.stop(cleanup)
self.lbcwallet_started = False
async def start(self):
await self.start_blockchain()
await self.start_lbcd()
await self.start_lbcwallet()
await self.start_spv()
await self.start_wallet()
@ -111,7 +138,8 @@ class Conductor:
all_the_stops = [
self.stop_wallet,
self.stop_spv,
self.stop_blockchain
self.stop_lbcwallet,
self.stop_lbcd
]
for stop in all_the_stops:
try:
@ -119,6 +147,11 @@ class Conductor:
except Exception as e:
log.exception('Exception raised while stopping services:', exc_info=e)
async def clear_mempool(self):
await self.stop_lbcwallet(cleanup=False)
await self.stop_lbcd(cleanup=False)
await self.start_lbcd()
await self.start_lbcwallet(clean=False)
class WalletNode:
@ -139,10 +172,11 @@ class WalletNode:
async def start(self, spv_node: 'SPVNode', seed=None, connect=True, config=None):
wallets_dir = os.path.join(self.data_path, 'wallets')
os.mkdir(wallets_dir)
wallet_file_name = os.path.join(wallets_dir, 'my_wallet.json')
with open(wallet_file_name, 'w') as wallet_file:
wallet_file.write('{"version": 1, "accounts": []}\n')
if not os.path.isdir(wallets_dir):
os.mkdir(wallets_dir)
with open(wallet_file_name, 'w') as wallet_file:
wallet_file.write('{"version": 1, "accounts": []}\n')
self.manager = self.manager_class.from_config({
'ledgers': {
self.ledger_class.get_id(): {
@ -198,14 +232,14 @@ class SPVNode:
self.stopped = False
self.index_name = uuid4().hex
async def start(self, blockchain_node: 'BlockchainNode', extraconf=None):
async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None):
self.data_path = tempfile.mkdtemp()
conf = {
'DESCRIPTION': '',
'PAYMENT_ADDRESS': '',
'DAILY_FEE': '0',
'DB_DIRECTORY': self.data_path,
'DAEMON_URL': blockchain_node.rpc_url,
'DAEMON_URL': lbcwallet_node.rpc_url,
'REORG_LIMIT': '100',
'HOST': self.hostname,
'TCP_PORT': str(self.port),
@ -240,18 +274,19 @@ class SPVNode:
shutil.rmtree(self.data_path, ignore_errors=True)
class BlockchainProcess(asyncio.SubprocessProtocol):
class LBCDProcess(asyncio.SubprocessProtocol):
IGNORE_OUTPUT = [
b'keypool keep',
b'keypool reserve',
b'keypool return',
b'Block submitted',
]
def __init__(self):
self.ready = asyncio.Event()
self.stopped = asyncio.Event()
self.log = log.getChild('blockchain')
self.log = log.getChild('lbcd')
def pipe_data_received(self, fd, data):
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
@ -262,7 +297,7 @@ class BlockchainProcess(asyncio.SubprocessProtocol):
if b'Error:' in data:
self.ready.set()
raise SystemError(data.decode())
if b'Done loading' in data:
if b'RPCS: RPC server listening on' in data:
self.ready.set()
def process_exited(self):
@ -270,10 +305,34 @@ class BlockchainProcess(asyncio.SubprocessProtocol):
self.ready.set()
class BlockchainNode:
class WalletProcess(asyncio.SubprocessProtocol):
P2SH_SEGWIT_ADDRESS = "p2sh-segwit"
BECH32_ADDRESS = "bech32"
IGNORE_OUTPUT = [
]
def __init__(self):
self.ready = asyncio.Event()
self.stopped = asyncio.Event()
self.log = log.getChild('lbcwallet')
self.transport: Optional[asyncio.transports.SubprocessTransport] = None
def pipe_data_received(self, fd, data):
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
if b'Error:' in data:
self.log.error(data.decode())
else:
self.log.info(data.decode())
if b'Error:' in data:
self.ready.set()
raise SystemError(data.decode())
if b'WLLT: Finished rescan' in data:
self.ready.set()
def process_exited(self):
self.stopped.set()
self.ready.set()
class LBCDNode:
def __init__(self, url, daemon, cli):
self.latest_release_url = url
@ -281,28 +340,22 @@ class BlockchainNode:
self.bin_dir = os.path.join(self.project_dir, 'bin')
self.daemon_bin = os.path.join(self.bin_dir, daemon)
self.cli_bin = os.path.join(self.bin_dir, cli)
self.log = log.getChild('blockchain')
self.data_path = None
self.log = log.getChild('lbcd')
self.data_path = tempfile.mkdtemp()
self.protocol = None
self.transport = None
self.block_expected = 0
self.hostname = 'localhost'
self.peerport = 9246 + 2 # avoid conflict with default peer port
self.rpcport = 9245 + 2 # avoid conflict with default rpc port
self.peerport = 29246
self.rpcport = 29245
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
self.stopped = False
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.stopped = True
self.running = asyncio.Event()
@property
def rpc_url(self):
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
def is_expected_block(self, e: BlockHeightEvent):
return self.block_expected == e.height
@property
def exists(self):
return (
@ -311,6 +364,12 @@ class BlockchainNode:
)
def download(self):
uname = platform.uname()
target_os = str.lower(uname.system)
target_arch = str.replace(uname.machine, 'x86_64', 'amd64')
target_platform = target_os + '_' + target_arch
self.latest_release_url = str.replace(self.latest_release_url, 'TARGET_PLATFORM', target_platform)
downloaded_file = os.path.join(
self.bin_dir,
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
@ -345,34 +404,34 @@ class BlockchainNode:
async def start(self):
assert self.ensure()
self.data_path = tempfile.mkdtemp()
loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop)
command = [
self.daemon_bin,
f'-datadir={self.data_path}', '-printtoconsole', '-regtest', '-server', '-txindex',
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}',
f'-port={self.peerport}'
'--notls',
f'--datadir={self.data_path}',
'--regtest', f'--listen=127.0.0.1:{self.peerport}', f'--rpclisten=127.0.0.1:{self.rpcport}',
'--txindex', f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}'
]
self.log.info(' '.join(command))
while not self.stopped:
while self.stopped:
if self.running.is_set():
await asyncio.sleep(1)
continue
await self.restart_ready.wait()
try:
self.transport, self.protocol = await loop.subprocess_exec(
BlockchainProcess, *command
LBCDProcess, *command
)
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.running.set()
self.stopped = False
except asyncio.CancelledError:
self.running.clear()
raise
except Exception as e:
self.running.clear()
log.exception('failed to start lbrycrdd', exc_info=e)
log.exception('failed to start lbcd', exc_info=e)
async def stop(self, cleanup=True):
self.stopped = True
@ -381,26 +440,147 @@ class BlockchainNode:
await self.protocol.stopped.wait()
self.transport.close()
finally:
self.log.info("Done shutting down " + self.daemon_bin)
if cleanup:
self.cleanup()
async def clear_mempool(self):
self.restart_ready.clear()
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
self.running.clear()
os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
self.restart_ready.set()
await self.running.wait()
self.running.clear()
def cleanup(self):
assert self.stopped
shutil.rmtree(self.data_path, ignore_errors=True)
class LBCWalletNode:
P2SH_SEGWIT_ADDRESS = "p2sh-segwit"
BECH32_ADDRESS = "bech32"
def __init__(self, url, lbcwallet, cli):
self.latest_release_url = url
self.project_dir = os.path.dirname(os.path.dirname(__file__))
self.bin_dir = os.path.join(self.project_dir, 'bin')
self.lbcwallet_bin = os.path.join(self.bin_dir, lbcwallet)
self.cli_bin = os.path.join(self.bin_dir, cli)
self.log = log.getChild('lbcwallet')
self.protocol = None
self.transport = None
self.hostname = 'localhost'
self.lbcd_rpcport = 29245
self.lbcwallet_rpcport = 29244
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
self.data_path = tempfile.mkdtemp()
self.stopped = True
self.running = asyncio.Event()
self.block_expected = 0
self.mining_addr = ''
@property
def rpc_url(self):
# FIXME: somehow the hub/sdk doesn't learn the blocks through the Walet RPC port, why?
# return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.lbcwallet_rpcport}/'
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.lbcd_rpcport}/'
def is_expected_block(self, e: BlockHeightEvent):
return self.block_expected == e.height
@property
def exists(self):
return (
os.path.exists(self.lbcwallet_bin)
)
def download(self):
uname = platform.uname()
target_os = str.lower(uname.system)
target_arch = str.replace(uname.machine, 'x86_64', 'amd64')
target_platform = target_os + '_' + target_arch
self.latest_release_url = str.replace(self.latest_release_url, 'TARGET_PLATFORM', target_platform)
downloaded_file = os.path.join(
self.bin_dir,
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
)
if not os.path.exists(self.bin_dir):
os.mkdir(self.bin_dir)
if not os.path.exists(downloaded_file):
self.log.info('Downloading: %s', self.latest_release_url)
with urllib.request.urlopen(self.latest_release_url) as response:
with open(downloaded_file, 'wb') as out_file:
shutil.copyfileobj(response, out_file)
self.log.info('Extracting: %s', downloaded_file)
if downloaded_file.endswith('.zip'):
with zipfile.ZipFile(downloaded_file) as dotzip:
dotzip.extractall(self.bin_dir)
# zipfile bug https://bugs.python.org/issue15795
os.chmod(self.lbcwallet_bin, 0o755)
elif downloaded_file.endswith('.tar.gz'):
with tarfile.open(downloaded_file) as tar:
tar.extractall(self.bin_dir)
return self.exists
def ensure(self):
return self.exists or self.download()
async def start(self):
assert self.ensure()
loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop)
command = [
self.lbcwallet_bin,
'--noservertls', '--noclienttls',
'--regtest',
f'--rpcconnect=127.0.0.1:{self.lbcd_rpcport}', f'--rpclisten=127.0.0.1:{self.lbcwallet_rpcport}',
'--createtemp', f'--appdata={self.data_path}',
f'--username={self.rpcuser}', f'--password={self.rpcpassword}'
]
self.log.info(' '.join(command))
while self.stopped:
if self.running.is_set():
await asyncio.sleep(1)
continue
try:
self.transport, self.protocol = await loop.subprocess_exec(
WalletProcess, *command
)
self.protocol.transport = self.transport
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.running.set()
self.stopped = False
except asyncio.CancelledError:
self.running.clear()
raise
except Exception as e:
self.running.clear()
log.exception('failed to start lbcwallet', exc_info=e)
def cleanup(self):
assert self.stopped
shutil.rmtree(self.data_path, ignore_errors=True)
async def stop(self, cleanup=True):
self.stopped = True
try:
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
finally:
self.log.info("Done shutting down " + self.lbcwallet_bin)
if cleanup:
self.cleanup()
self.running.clear()
async def _cli_cmnd(self, *args):
cmnd_args = [
self.cli_bin, f'-datadir={self.data_path}', '-regtest',
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}'
self.cli_bin,
f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}', '--notls', '--regtest', '--wallet'
] + list(args)
self.log.info(' '.join(cmnd_args))
loop = asyncio.get_event_loop()
@ -417,7 +597,14 @@ class BlockchainNode:
def generate(self, blocks):
self.block_expected += blocks
return self._cli_cmnd('generate', str(blocks))
return self._cli_cmnd('generatetoaddress', str(blocks), self.mining_addr)
def generate_to_address(self, blocks, addr):
self.block_expected += blocks
return self._cli_cmnd('generatetoaddress', str(blocks), addr)
def wallet_passphrase(self, passphrase, timeout):
return self._cli_cmnd('walletpassphrase', passphrase, str(timeout))
def invalidate_block(self, blockhash):
return self._cli_cmnd('invalidateblock', blockhash)
@ -434,7 +621,7 @@ class BlockchainNode:
def get_raw_change_address(self):
return self._cli_cmnd('getrawchangeaddress')
def get_new_address(self, address_type):
def get_new_address(self, address_type='legacy'):
return self._cli_cmnd('getnewaddress', "", address_type)
async def get_balance(self):
@ -450,7 +637,10 @@ class BlockchainNode:
return self._cli_cmnd('createrawtransaction', json.dumps(inputs), json.dumps(outputs))
async def sign_raw_transaction_with_wallet(self, tx):
return json.loads(await self._cli_cmnd('signrawtransactionwithwallet', tx))['hex'].encode()
# the "withwallet" portion should only come into play if we are doing segwit.
# and "withwallet" doesn't exist on lbcd yet.
result = await self._cli_cmnd('signrawtransaction', tx)
return json.loads(result)['hex'].encode()
def decode_raw_transaction(self, tx):
return self._cli_cmnd('decoderawtransaction', hexlify(tx.raw).decode())
@ -496,8 +686,6 @@ class HubNode:
self.hostname = 'localhost'
self.rpcport = 50051 # avoid conflict with default rpc port
self.stopped = False
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.running = asyncio.Event()
@property
@ -554,7 +742,6 @@ class HubNode:
if self.running.is_set():
await asyncio.sleep(1)
continue
await self.restart_ready.wait()
try:
if not self.debug:
self.transport, self.protocol = await loop.subprocess_exec(

View file

@ -61,8 +61,10 @@ class ConductorService:
#set_logging(
# self.stack.ledger_module, logging.DEBUG, WebSocketLogHandler(self.send_message)
#)
self.stack.blockchain_started or await self.stack.start_blockchain()
self.send_message({'type': 'service', 'name': 'blockchain', 'port': self.stack.blockchain_node.port})
self.stack.lbcd_started or await self.stack.start_lbcd()
self.send_message({'type': 'service', 'name': 'lbcd', 'port': self.stack.lbcd_node.port})
self.stack.lbcwallet_started or await self.stack.start_lbcwallet()
self.send_message({'type': 'service', 'name': 'lbcwallet', 'port': self.stack.lbcwallet_node.port})
self.stack.spv_started or await self.stack.start_spv()
self.send_message({'type': 'service', 'name': 'spv', 'port': self.stack.spv_node.port})
self.stack.wallet_started or await self.stack.start_wallet()
@ -74,7 +76,7 @@ class ConductorService:
async def generate(self, request):
data = await request.post()
blocks = data.get('blocks', 1)
await self.stack.blockchain_node.generate(int(blocks))
await self.stack.lbcwallet_node.generate(int(blocks))
return json_response({'blocks': blocks})
async def transfer(self, request):
@ -85,7 +87,7 @@ class ConductorService:
if not address:
raise ValueError("No address was provided.")
amount = data.get('amount', 1)
txid = await self.stack.blockchain_node.send_to_address(address, amount)
txid = await self.stack.lbcwallet_node.send_to_address(address, amount)
if self.stack.wallet_started:
await self.stack.wallet_node.ledger.on_transaction.where(
lambda e: e.tx.id == txid and e.address == address
@ -98,7 +100,7 @@ class ConductorService:
async def balance(self, _):
return json_response({
'balance': await self.stack.blockchain_node.get_balance()
'balance': await self.stack.lbcwallet_node.get_balance()
})
async def log(self, request):
@ -129,7 +131,7 @@ class ConductorService:
'type': 'status',
'height': self.stack.wallet_node.ledger.headers.height,
'balance': satoshis_to_coins(await self.stack.wallet_node.account.get_balance()),
'miner': await self.stack.blockchain_node.get_balance()
'miner': await self.stack.lbcwallet_node.get_balance()
})
def send_message(self, msg):

View file

@ -250,14 +250,14 @@ class Daemon:
async def deserialised_block(self, hex_hash):
"""Return the deserialised block with the given hex hash."""
if hex_hash not in self._block_cache:
block = await self._send_single('getblock', (hex_hash, True))
block = await self._send_single('getblock', (hex_hash, 1))
self._block_cache[hex_hash] = block
return block
return self._block_cache[hex_hash]
async def raw_blocks(self, hex_hashes):
"""Return the raw binary blocks with the given hex hashes."""
params_iterable = ((h, False) for h in hex_hashes)
params_iterable = ((h, 0) for h in hex_hashes)
blocks = await self._send_vector('getblock', params_iterable)
# Convert hex string to bytes
return [hex_to_bytes(block) for block in blocks]
@ -334,42 +334,7 @@ class LBCDaemon(Daemon):
async def getrawtransaction(self, hex_hash, verbose=False):
return await super().getrawtransaction(hex_hash=hex_hash, verbose=verbose)
@handles_errors
async def getclaimbyid(self, claim_id):
'''Given a claim id, retrieves claim information.'''
return await self._send_single('getclaimbyid', (claim_id,))
@handles_errors
async def getclaimsbyids(self, claim_ids):
'''Given a list of claim ids, batches calls to retrieve claim information.'''
return await self._send_vector('getclaimbyid', ((claim_id,) for claim_id in claim_ids))
@handles_errors
async def getclaimsforname(self, name):
'''Given a name, retrieves all claims matching that name.'''
return await self._send_single('getclaimsforname', (name,))
@handles_errors
async def getclaimsfortx(self, txid):
'''Given a txid, returns the claims it make.'''
return await self._send_single('getclaimsfortx', (txid,)) or []
@handles_errors
async def getnameproof(self, name, block_hash=None):
'''Given a name and optional block_hash, returns a name proof and winner, if any.'''
return await self._send_single('getnameproof', (name, block_hash,) if block_hash else (name,))
@handles_errors
async def getvalueforname(self, name):
'''Given a name, returns the winning claim value.'''
return await self._send_single('getvalueforname', (name,))
@handles_errors
async def getnamesintrie(self):
'''Given a name, returns the winning claim value.'''
return await self._send_single('getnamesintrie')
@handles_errors
async def claimname(self, name, hexvalue, amount):
'''Claim a name, used for functional tests only.'''
return await self._send_single('claimname', (name, hexvalue, float(amount)))

View file

@ -1,32 +1,130 @@
import struct
import rocksdb
from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
class KeyValueStorage:
class RocksDBStore:
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
# Use snappy compression (the default)
self.path = path
self._max_open_files = max_open_files
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
# self.multi_get = self.db.multi_get
def get_options(self):
return rocksdb.Options(
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
max_open_files=self._max_open_files
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
raise NotImplemented()
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
raise NotImplemented()
return RocksDBIterator(
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value
)
def write_batch(self, transaction: bool = False):
raise NotImplemented()
def write_batch(self, disable_wal: bool = False, sync: bool = False):
return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
def close(self):
raise NotImplemented()
self.db.close()
self.db = None
@property
def closed(self) -> bool:
raise NotImplemented()
return self.db is None
def try_catch_up_with_primary(self):
self.db.try_catch_up_with_primary()
class RocksDBWriteBatch:
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
self.batch = rocksdb.WriteBatch()
self.db = db
self.sync = sync
self.disable_wal = disable_wal
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
class RocksDBIterator:
"""An iterator for RocksDB."""
__slots__ = [
'start',
'prefix',
'stop',
'iterator',
'include_key',
'include_value',
'prev_k',
'reverse',
'include_start',
'include_stop'
]
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None,
include_key: bool = True, include_value: bool = True, reverse: bool = False,
include_start: bool = True, include_stop: bool = False):
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
self.start = start
self.prefix = prefix
self.stop = stop
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
if prefix is not None:
self.iterator.seek(prefix)
elif start is not None:
self.iterator.seek(start)
self.include_key = include_key
self.include_value = include_value
self.prev_k = None
self.reverse = reverse
self.include_start = include_start
self.include_stop = include_stop
def __iter__(self):
return self
def _check_stop_iteration(self, key: bytes):
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
raise StopIteration
elif self.start is not None and self.start > key[:len(self.start)]:
raise StopIteration
elif self.prefix is not None and not key.startswith(self.prefix):
raise StopIteration
def __next__(self):
# TODO: include start/stop on/off
# check for needing to stop from previous iteration
if self.prev_k is not None:
self._check_stop_iteration(self.prev_k)
k, v = next(self.iterator)
self._check_stop_iteration(k)
self.prev_k = k
if self.include_key and self.include_value:
return k, v
elif self.include_key:
return k
return v
class PrefixDB:
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None):
def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
self._db = db
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
self._max_undo_depth = max_undo_depth
@ -37,7 +135,7 @@ class PrefixDB:
Changes written cannot be undone
"""
try:
with self._db.write_batch(transaction=True) as batch:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -61,7 +159,7 @@ class PrefixDB:
include_value=False
))
try:
with self._db.write_batch(transaction=True) as batch:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -82,7 +180,7 @@ class PrefixDB:
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
try:
with self._db.write_batch(transaction=True) as batch:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -108,6 +206,9 @@ class PrefixDB:
if not self._db.closed:
self._db.close()
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
@property
def closed(self):
return self._db.closed

View file

@ -4,7 +4,7 @@ import array
import base64
from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB
from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name
@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType):
value_struct: struct.Struct
key_part_lambdas = []
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack):
def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack):
self._db = db
self._op_stack = op_stack
@ -1595,40 +1595,10 @@ class BlockTxsPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(tx_hashes)
class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel
self.db = plyvel.DB(
path, create_if_missing=True, max_open_files=max_open_files,
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return self.db.iterator(
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
)
def write_batch(self, transaction: bool = False, sync: bool = False):
return self.db.write_batch(transaction=transaction, sync=sync)
def close(self):
return self.db.close()
@property
def closed(self) -> bool:
return self.db.closed
class HubDB(PrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
db = LevelDBStore(path, cache_mb, max_open_files)
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path)
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)

View file

@ -26,7 +26,10 @@ class Server:
self.prometheus_server: typing.Optional[PrometheusServer] = None
self.session_mgr = LBRYSessionManager(
env, db, bp, daemon, self.shutdown_event
env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
self.shutdown_event,
on_available_callback=bp.status_server.set_available,
on_unavailable_callback=bp.status_server.set_unavailable
)
self._indexer_task = None

View file

@ -170,13 +170,16 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event):
def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
daemon: 'Daemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send)
self.env = env
self.db = db
self.bp = bp
self.on_available_callback = on_available_callback
self.on_unavailable_callback = on_unavailable_callback
self.daemon = daemon
self.mempool = bp.mempool
self.mempool = mempool
self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
@ -186,7 +189,9 @@ class SessionManager:
self.cur_group = SessionGroup(0)
self.txs_sent = 0
self.start_time = time.time()
self.history_cache = self.bp.history_cache
self.history_cache = history_cache
self.resolve_cache = resolve_cache
self.resolve_outputs_cache = resolve_outputs_cache
self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
@ -243,7 +248,7 @@ class SessionManager:
await self.session_event.wait()
self.session_event.clear()
if not paused and len(self.sessions) >= max_sessions:
self.bp.status_server.set_unavailable()
self.on_unavailable_callback()
self.logger.info(f'maximum sessions {max_sessions:,d} '
f'reached, stopping new connections until '
f'count drops to {low_watermark:,d}')
@ -252,7 +257,7 @@ class SessionManager:
# Start listening for incoming connections if paused and
# session count has fallen
if paused and len(self.sessions) <= low_watermark:
self.bp.status_server.set_available()
self.on_available_callback()
self.logger.info('resuming listening for incoming connections')
await self._start_external_servers()
paused = False
@ -533,7 +538,7 @@ class SessionManager:
await self.start_other()
await self._start_external_servers()
server_listening_event.set()
self.bp.status_server.set_available()
self.on_available_callback()
# Peer discovery should start after the external servers
# because we connect to ourself
await asyncio.wait([
@ -628,8 +633,9 @@ class SessionManager:
for hashX in touched.intersection(self.mempool_statuses.keys()):
self.mempool_statuses.pop(hashX, None)
# self.bp._chain_executor
await asyncio.get_event_loop().run_in_executor(
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
)
if touched or new_touched or (height_changed and self.mempool_statuses):
@ -866,8 +872,7 @@ class LBRYElectrumX(SessionBase):
self.protocol_tuple = self.PROTOCOL_MIN
self.protocol_string = None
self.daemon = self.session_mgr.daemon
self.bp: BlockProcessor = self.session_mgr.bp
self.db: LevelDB = self.bp.db
self.db: LevelDB = self.session_mgr.db
@classmethod
def protocol_min_max_strings(cls):
@ -1007,21 +1012,21 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def _cached_resolve_url(self, url):
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.bp.resolve_cache[url]
if url not in self.session_mgr.resolve_cache:
self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.session_mgr.resolve_cache[url]
async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
if sorted_urls in self.session_mgr.resolve_outputs_cache:
return self.session_mgr.resolve_outputs_cache[sorted_urls]
rows, extra = [], []
for url in urls:
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
if url not in self.session_mgr.resolve_cache:
self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
@ -1046,7 +1051,7 @@ class LBRYElectrumX(SessionBase):
if reposted_channel:
extra.append(reposted_channel)
await asyncio.sleep(0)
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None
)
return result
@ -1054,7 +1059,7 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self):
return self.bp.height
return self.db.db_height
async def transaction_get_height(self, tx_hash):
self.assert_tx_hash(tx_hash)
@ -1466,7 +1471,7 @@ class LBRYElectrumX(SessionBase):
if mempool_tx:
raw_tx, block_hash = mempool_tx.raw_tx.hex(), None
else:
tx_info = await self.daemon_request('getrawtransaction', tx_hash, True)
tx_info = await self.daemon_request('getrawtransaction', tx_hash, 1)
raw_tx = tx_info['hex']
block_hash = tx_info.get('blockhash')
if block_hash:
@ -1503,7 +1508,7 @@ class LBRYElectrumX(SessionBase):
if verbose not in (True, False):
raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean')
return await self.daemon_request('getrawtransaction', tx_hash, verbose)
return await self.daemon_request('getrawtransaction', tx_hash, int(verbose))
def _get_merkle_branch(self, tx_hashes, tx_pos):
"""Return a merkle branch to a transaction.

View file

@ -40,22 +40,17 @@ def checkrecord(record, expected_winner, expected_claim):
async def checkcontrolling(daemon: Daemon, db: SQLDB):
records, claim_ids, names, futs = [], [], [], []
records, names, futs = [], [], []
for record in db.get_claims('claimtrie.claim_hash as is_controlling, claim.*', is_controlling=True):
records.append(record)
claim_id = hex_reverted(record['claim_hash'])
claim_ids.append((claim_id,))
names.append((record['normalized'],))
names.append((record['normalized'], (claim_id,), "", True)) # last parameter is IncludeValues
if len(names) > 50000:
futs.append(daemon._send_vector('getvalueforname', names[:]))
futs.append(daemon._send_vector('getclaimbyid', claim_ids[:]))
futs.append(daemon._send_vector('getclaimsfornamebyid', names))
names.clear()
claim_ids.clear()
if names:
futs.append(daemon._send_vector('getvalueforname', names[:]))
futs.append(daemon._send_vector('getclaimbyid', claim_ids[:]))
futs.append(daemon._send_vector('getclaimsfornamebyid', names))
names.clear()
claim_ids.clear()
while futs:
winners, claims = futs.pop(0), futs.pop(0)

View file

@ -7,9 +7,11 @@ BASE = os.path.dirname(__file__)
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
long_description = fh.read()
PLYVEL = []
if sys.platform.startswith('linux'):
PLYVEL.append('plyvel==1.3.0')
ROCKSDB = []
if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
ROCKSDB.append('lbry-rocksdb==0.8.1')
setup(
name=__name__,
@ -57,7 +59,7 @@ setup(
'pylru==1.1.0',
'elasticsearch==7.10.1',
'grpcio==1.38.0'
] + PLYVEL,
] + ROCKSDB,
extras_require={
'torrent': ['lbry-libtorrent'],
'lint': ['pylint==2.10.0'],

47
test_rocksdb.py Executable file
View file

@ -0,0 +1,47 @@
#! python
import os
import shutil
import rocksdb
import tempfile
import logging
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
def _main(db_loc):
opts = rocksdb.Options(create_if_missing=True)
db = rocksdb.DB(os.path.join(db_loc, "test"), opts)
secondary_location = os.path.join(db_loc, "secondary")
secondary = rocksdb.DB(
os.path.join(db_loc, "test"),
rocksdb.Options(create_if_missing=True, max_open_files=-1),
secondary_name=secondary_location
)
try:
assert secondary.get(b"a") is None
db.put(b"a", b"b")
assert db.get(b"a") == b"b"
assert secondary.get(b"a") is None
secondary.try_catch_up_with_primary()
assert secondary.get(b"a") == b"b"
finally:
secondary.close()
db.close()
def main():
db_dir = tempfile.mkdtemp()
try:
_main(db_dir)
log.info("rocksdb %s (%s) works!", rocksdb.__version__, rocksdb.ROCKSDB_VERSION)
except:
log.exception("boom")
finally:
shutil.rmtree(db_dir)
if __name__ == "__main__":
main()

View file

@ -112,7 +112,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool()
await self.conductor.clear_mempool()
await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg
@ -191,7 +191,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool()
await self.conductor.clear_mempool()
await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg

View file

@ -135,7 +135,7 @@ class ReconnectTests(IntegrationTestCase):
await self.conductor.spv_node.stop()
self.assertFalse(self.ledger.network.is_connected)
await asyncio.sleep(0.2) # let it retry and fail once
await self.conductor.spv_node.start(self.conductor.blockchain_node)
await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected)
@ -165,8 +165,10 @@ class UDPServerFailDiscoveryTest(AsyncioTestCase):
async def test_wallet_connects_despite_lack_of_udp(self):
conductor = Conductor()
conductor.spv_node.udp_port = '0'
await conductor.start_blockchain()
self.addCleanup(conductor.stop_blockchain)
await conductor.start_lbcd()
self.addCleanup(conductor.stop_lbcd)
await conductor.start_lbcwallet()
self.addCleanup(conductor.stop_lbcwallet)
await conductor.start_spv()
self.addCleanup(conductor.stop_spv)
self.assertFalse(conductor.spv_node.server.bp.status_server.is_running)

View file

@ -48,7 +48,7 @@ class WalletCommands(CommandTestCase):
self.assertEqual(status['wallet']['servers'][0]['port'], 50002)
await self.conductor.spv_node.stop(True)
self.conductor.spv_node.port = 54320
await self.conductor.spv_node.start(self.conductor.blockchain_node)
await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
status = await self.daemon.jsonrpc_status()
self.assertEqual(len(status['wallet']['servers']), 0)
self.daemon.jsonrpc_settings_set('lbryum_servers', ['localhost:54320'])
@ -58,15 +58,15 @@ class WalletCommands(CommandTestCase):
self.assertEqual(status['wallet']['servers'][0]['port'], 54320)
async def test_sending_to_scripthash_address(self):
self.assertEqual(await self.blockchain.get_balance(), '95.99973580')
bal = await self.blockchain.get_balance()
await self.assertBalance(self.account, '10.0')
p2sh_address1 = await self.blockchain.get_new_address(self.blockchain.P2SH_SEGWIT_ADDRESS)
tx = await self.account_send('2.0', p2sh_address1)
self.assertEqual(tx['outputs'][0]['address'], p2sh_address1)
self.assertEqual(await self.blockchain.get_balance(), '98.99973580') # +1 lbc for confirm block
self.assertEqual(await self.blockchain.get_balance(), str(float(bal)+3)) # +1 lbc for confirm block
await self.assertBalance(self.account, '7.999877')
await self.wallet_send('3.0', p2sh_address1)
self.assertEqual(await self.blockchain.get_balance(), '102.99973580') # +1 lbc for confirm block
self.assertEqual(await self.blockchain.get_balance(), str(float(bal)+7)) # +1 lbc for confirm block
await self.assertBalance(self.account, '4.999754')
async def test_balance_caching(self):

View file

@ -1553,7 +1553,7 @@ class StreamCommands(ClaimTestCase):
)
# test setting from env vars and starting from scratch
await self.conductor.spv_node.stop(False)
await self.conductor.spv_node.start(self.conductor.blockchain_node,
await self.conductor.spv_node.start(self.conductor.lbcwallet_node,
extraconf={'BLOCKING_CHANNEL_IDS': blocking_channel_id,
'FILTERING_CHANNEL_IDS': filtering_channel_id})
await self.daemon.wallet_manager.reset()

View file

@ -31,13 +31,13 @@ class BaseResolveTestCase(CommandTestCase):
self.assertEqual(claim_from_es['effective_amount'], claim_from_db.effective_amount)
def assertMatchDBClaim(self, expected, claim):
self.assertEqual(expected['claimId'], claim.claim_hash.hex())
self.assertEqual(expected['validAtHeight'], claim.activation_height)
self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height)
self.assertEqual(expected['txId'], claim.tx_hash[::-1].hex())
self.assertEqual(expected['claimid'], claim.claim_hash.hex())
self.assertEqual(expected['validatheight'], claim.activation_height)
self.assertEqual(expected['lasttakeoverheight'], claim.last_takeover_height)
self.assertEqual(expected['txid'], claim.tx_hash[::-1].hex())
self.assertEqual(expected['n'], claim.position)
self.assertEqual(expected['amount'], claim.amount)
self.assertEqual(expected['effectiveAmount'], claim.effective_amount)
self.assertEqual(expected['effectiveamount'], claim.effective_amount)
async def assertResolvesToClaimId(self, name, claim_id):
other = await self.resolve(name)
@ -53,9 +53,10 @@ class BaseResolveTestCase(CommandTestCase):
self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex())
async def assertNoClaimForName(self, name: str):
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
self.assertNotIn('claimId', lbrycrd_winning)
if 'claims' in lbrycrd_winning and lbrycrd_winning['claims'] is not None:
self.assertEqual(len(lbrycrd_winning['claims']), 0)
if stream is not None:
self.assertIsInstance(stream, LookupError)
else:
@ -63,20 +64,23 @@ class BaseResolveTestCase(CommandTestCase):
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name)
self.assertListEqual([], claim_from_es[0])
async def assertNoClaim(self, claim_id: str):
self.assertDictEqual(
{}, json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
)
async def assertNoClaim(self, name: str, claim_id: str):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebyid', name, '["' + claim_id + '"]'))
if 'claims' in expected and expected['claims'] is not None:
# ensure that if we do have the matching claim that it is not active
self.assertEqual(expected['claims'][0]['effectiveamount'], 0)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id)
self.assertListEqual([], claim_from_es[0])
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
self.assertIsNone(claim)
async def assertMatchWinningClaim(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebybid', name, "[0]"))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
claim = stream if stream else channel
await self._assertMatchClaim(expected, claim)
expected['claims'][0]['lasttakeoverheight'] = expected['lasttakeoverheight']
await self._assertMatchClaim(expected['claims'][0], claim)
return claim
async def _assertMatchClaim(self, expected, claim):
@ -86,28 +90,31 @@ class BaseResolveTestCase(CommandTestCase):
)
self.assertEqual(len(claim_from_es[0]), 1)
self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(claim.claim_hash.hex(), expected['supports'], claim_from_es[0][0]['support_amount'])
self._check_supports(claim.claim_hash.hex(), expected.get('supports', []),
claim_from_es[0][0]['support_amount'], expected['effectiveamount'] > 0)
async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
async def assertMatchClaim(self, name, claim_id, is_active_in_lbrycrd=True):
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
if is_active_in_lbrycrd:
if not expected:
self.assertIsNone(claim)
return
self.assertMatchDBClaim(expected, claim)
else:
self.assertDictEqual({}, expected)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_id=claim.claim_hash.hex()
)
self.assertEqual(len(claim_from_es[0]), 1)
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex())
self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(
claim.claim_hash.hex(), expected.get('supports', []), claim_from_es[0][0]['support_amount'],
is_active_in_lbrycrd
)
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebyid', name, '["' + claim_id + '"]'))
if is_active_in_lbrycrd:
if not expected:
self.assertIsNone(claim)
return
expected['claims'][0]['lasttakeoverheight'] = expected['lasttakeoverheight']
self.assertMatchDBClaim(expected['claims'][0], claim)
self._check_supports(claim.claim_hash.hex(), expected['claims'][0].get('supports', []),
claim_from_es[0][0]['support_amount'], is_active_in_lbrycrd)
else:
if 'claims' in expected and expected['claims'] is not None:
# ensure that if we do have the matching claim that it is not active
self.assertEqual(expected['claims'][0]['effectiveamount'], 0)
return claim
async def assertMatchClaimIsWinning(self, name, claim_id):
@ -122,34 +129,31 @@ class BaseResolveTestCase(CommandTestCase):
total_amount += amount
if is_active_in_lbrycrd:
support = lbrycrd_supports[i]
self.assertEqual(support['txId'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex())
self.assertEqual(support['txid'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex())
self.assertEqual(support['n'], position)
self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num))
self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True))
self.assertEqual(support['validatheight'], db.get_activation(tx_num, position, is_support=True))
self.assertEqual(total_amount, es_support_amount, f"lbrycrd support amount: {total_amount} vs es: {es_support_amount}")
async def assertMatchClaimsForName(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name))
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name, "", "true"))
db = self.conductor.spv_node.server.bp.db
# self.assertEqual(len(expected['claims']), len(db_claims.claims))
# self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight)
last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight']
for c in expected['claims']:
c['lastTakeoverHeight'] = last_takeover
claim_id = c['claimId']
c['lasttakeoverheight'] = expected['lasttakeoverheight']
claim_id = c['claimid']
claim_hash = bytes.fromhex(claim_id)
claim = db._fs_get_claim_by_hash(claim_hash)
self.assertMatchDBClaim(c, claim)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_id=c['claimId']
claim_id=claim_id
)
self.assertEqual(len(claim_from_es[0]), 1)
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), c['claimId'])
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim_id)
self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount'])
self._check_supports(claim_id, c.get('supports', []),
claim_from_es[0][0]['support_amount'], c['effectiveamount'] > 0)
class ResolveCommand(BaseResolveTestCase):
@ -262,18 +266,18 @@ class ResolveCommand(BaseResolveTestCase):
self.assertEqual(claim['confirmations'], json.loads(tx_details)['confirmations'])
# resolve handles invalid data
await self.blockchain_claim_name("gibberish", hexlify(b"{'invalid':'json'}").decode(), "0.1")
await self.generate(1)
response = await self.out(self.daemon.jsonrpc_resolve("lbry://gibberish"))
self.assertSetEqual({'lbry://gibberish'}, set(response))
claim = response['lbry://gibberish']
self.assertEqual(claim['name'], 'gibberish')
self.assertNotIn('value', claim)
# await self.blockchain_claim_name("gibberish", hexlify(b"{'invalid':'json'}").decode(), "0.1")
# await self.generate(1)
# response = await self.out(self.daemon.jsonrpc_resolve("lbry://gibberish"))
# self.assertSetEqual({'lbry://gibberish'}, set(response))
# claim = response['lbry://gibberish']
# self.assertEqual(claim['name'], 'gibberish')
# self.assertNotIn('value', claim)
# resolve retries
await self.conductor.spv_node.stop()
resolve_task = asyncio.create_task(self.resolve('foo'))
await self.conductor.spv_node.start(self.conductor.blockchain_node)
await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
self.assertIsNotNone((await resolve_task)['claim_id'])
async def test_winning_by_effective_amount(self):
@ -646,7 +650,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height)
await self.assertMatchClaimIsWinning(name, winning_claim_id)
for non_winning in non_winning_claims:
claim = await self.assertMatchClaim(
claim = await self.assertMatchClaim(name,
non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd
)
self.assertEqual(non_winning.activation_height, claim.activation_height)
@ -1334,7 +1338,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.generate(1)
await self.assertMatchClaim(first_claim_id)
await self.assertMatchClaim(name, first_claim_id)
await self.assertMatchClaimIsWinning(name, second_claim_id)
async def test_remove_controlling_support(self):
@ -1405,12 +1409,12 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.generate(32)
second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id']
await self.assertNoClaim(second_claim_id)
await self.assertNoClaim(name, second_claim_id)
self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1
)
await self.generate(1)
await self.assertMatchClaim(second_claim_id)
await self.assertMatchClaim(name, second_claim_id)
self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2
)
@ -1570,7 +1574,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
# reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool()
await self.conductor.clear_mempool()
await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg
@ -1649,7 +1653,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
# reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool()
await self.conductor.clear_mempool()
await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg

View file

@ -60,7 +60,7 @@ class TestSegwit(CommandTestCase):
tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid1, "vout": 0},
{"txid": bech32_txid1, "vout": 0},
], [{p2sh_address3: '1.9'}]
], {p2sh_address3: 1.9}
)
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
p2sh_txid3 = await self.blockchain.send_raw_transaction(tx)
@ -71,7 +71,7 @@ class TestSegwit(CommandTestCase):
tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid2, "vout": 0},
{"txid": bech32_txid2, "vout": 0},
], [{bech32_address3: '1.9'}]
], {bech32_address3: 1.9}
)
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
bech32_txid3 = await self.blockchain.send_raw_transaction(tx)
@ -83,7 +83,7 @@ class TestSegwit(CommandTestCase):
tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid3, "vout": 0},
{"txid": bech32_txid3, "vout": 0},
], [{address: '3.5'}]
], {address: 3.5}
)
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
txid = await self.blockchain.send_raw_transaction(tx)