Compare commits

...
Sign in to create a new pull request.

14 commits

Author SHA1 Message Date
Roy Lee
09159fbd03 update lbcd and lbcwallet veresion 2021-11-30 23:42:22 -08:00
Brannon King
c27d9c50bd fixing a variety of broken tests, updated getclaimbyid usage 2021-11-26 11:45:40 -05:00
Roy Lee
a719c06615 test: lbcd don't have a dedicated mempool.dat 2021-11-15 12:41:29 -08:00
Roy Lee
79b80d9bbf test: update lbcd to avoid txn being rejected due to munimum fee 2021-11-15 12:41:29 -08:00
Roy Lee
f2fbf23886 HACK: temporary hack to move things along 2021-11-15 12:41:29 -08:00
Roy Lee
caa1b44492 add a rocksdb setup sanity check 2021-11-15 12:41:29 -08:00
Roy Lee
294da1b5aa test: migrate from lbrycrd to lbcd/lbcwallet 2021-11-15 12:41:29 -08:00
Roy Lee
fe06df5ec9 test: support generatetoaddress RPC 2021-11-15 12:41:29 -08:00
Roy Lee
ea6b22f8ef test: support walletpassphrase RPC 2021-11-15 12:41:29 -08:00
Roy Lee
85a96e738c test: getnewaddress RPC takes account name 2021-11-15 12:41:29 -08:00
Roy Lee
89db9f143c test: update some RPC arguments to int type 2021-11-15 12:41:29 -08:00
Roy Lee
56d42cc4e4 test: setup loggers first 2021-11-15 12:41:29 -08:00
Roy Lee
73a09e30d4 Update log level from WARN to INFO 2021-11-15 12:41:29 -08:00
Jack Robison
3f1258ee5d
use rocksdb instead of leveldb
-dont use block processor directly from session manager
2021-11-15 14:27:58 -05:00
20 changed files with 582 additions and 285 deletions

View file

@ -15,7 +15,6 @@ RUN apt-get update && \
build-essential \ build-essential \
automake libtool \ automake libtool \
pkg-config \ pkg-config \
libleveldb-dev \
python3.7 \ python3.7 \
python3-dev \ python3-dev \
python3-pip \ python3-pip \

View file

@ -18,7 +18,7 @@ from lbry.wallet import WalletManager, Wallet, Ledger, Account, Transaction
from lbry.conf import Config from lbry.conf import Config
from lbry.wallet.util import satoshis_to_coins from lbry.wallet.util import satoshis_to_coins
from lbry.wallet.orchstr8 import Conductor from lbry.wallet.orchstr8 import Conductor
from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode from lbry.wallet.orchstr8.node import LBCWalletNode, WalletNode, HubNode
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
from lbry.extras.daemon.components import Component, WalletComponent from lbry.extras.daemon.components import Component, WalletComponent
@ -222,7 +222,7 @@ class IntegrationTestCase(AsyncioTestCase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.conductor: Optional[Conductor] = None self.conductor: Optional[Conductor] = None
self.blockchain: Optional[BlockchainNode] = None self.blockchain: Optional[LBCWalletNode] = None
self.hub: Optional[HubNode] = None self.hub: Optional[HubNode] = None
self.wallet_node: Optional[WalletNode] = None self.wallet_node: Optional[WalletNode] = None
self.manager: Optional[WalletManager] = None self.manager: Optional[WalletManager] = None
@ -232,15 +232,17 @@ class IntegrationTestCase(AsyncioTestCase):
async def asyncSetUp(self): async def asyncSetUp(self):
self.conductor = Conductor(seed=self.SEED) self.conductor = Conductor(seed=self.SEED)
await self.conductor.start_blockchain() await self.conductor.start_lbcd()
self.addCleanup(self.conductor.stop_blockchain) self.addCleanup(self.conductor.stop_lbcd)
await self.conductor.start_lbcwallet()
self.addCleanup(self.conductor.stop_lbcwallet)
await self.conductor.start_spv() await self.conductor.start_spv()
self.addCleanup(self.conductor.stop_spv) self.addCleanup(self.conductor.stop_spv)
await self.conductor.start_wallet() await self.conductor.start_wallet()
self.addCleanup(self.conductor.stop_wallet) self.addCleanup(self.conductor.stop_wallet)
await self.conductor.start_hub() await self.conductor.start_hub()
self.addCleanup(self.conductor.stop_hub) self.addCleanup(self.conductor.stop_hub)
self.blockchain = self.conductor.blockchain_node self.blockchain = self.conductor.lbcwallet_node
self.hub = self.conductor.hub_node self.hub = self.conductor.hub_node
self.wallet_node = self.conductor.wallet_node self.wallet_node = self.conductor.wallet_node
self.manager = self.wallet_node.manager self.manager = self.wallet_node.manager
@ -320,7 +322,7 @@ class ExchangeRateManagerComponent(Component):
class CommandTestCase(IntegrationTestCase): class CommandTestCase(IntegrationTestCase):
VERBOSITY = logging.WARN VERBOSITY = logging.INFO
blob_lru_cache_size = 0 blob_lru_cache_size = 0
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -337,13 +339,14 @@ class CommandTestCase(IntegrationTestCase):
self.skip_libtorrent = True self.skip_libtorrent = True
async def asyncSetUp(self): async def asyncSetUp(self):
await super().asyncSetUp()
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY) logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY) logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY) logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY) logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
await super().asyncSetUp()
self.daemon = await self.add_daemon(self.wallet_node) self.daemon = await self.add_daemon(self.wallet_node)
await self.account.ensure_address_gap() await self.account.ensure_address_gap()

View file

@ -1,8 +1,13 @@
__node_daemon__ = 'lbrycrdd' __lbcd__ = 'lbcd'
__node_cli__ = 'lbrycrd-cli' __lbcctl__ = 'lbcctl'
__node_bin__ = '' __lbcwallet__ = 'lbcwallet'
__node_url__ = ( __lbcd_url__ = (
'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.6/lbrycrd-linux-1746.zip' 'https://github.com/lbryio/lbcd/releases/download/' +
'v0.22.100-beta-rc4/lbcd_0.22.100-beta-rc4_TARGET_PLATFORM.tar.gz'
)
__lbcwallet_url__ = (
'https://github.com/lbryio/lbcwallet/releases/download/' +
'v0.12.100-alpha-rc2/lbcwallet_0.12.100-alpha-rc2_TARGET_PLATFORM.tar.gz'
) )
__spvserver__ = 'lbry.wallet.server.coin.LBCRegTest' __spvserver__ = 'lbry.wallet.server.coin.LBCRegTest'

View file

@ -5,7 +5,9 @@ import aiohttp
from lbry import wallet from lbry import wallet
from lbry.wallet.orchstr8.node import ( from lbry.wallet.orchstr8.node import (
Conductor, get_blockchain_node_from_ledger Conductor,
get_lbcd_node_from_ledger,
get_lbcwallet_node_from_ledger
) )
from lbry.wallet.orchstr8.service import ConductorService from lbry.wallet.orchstr8.service import ConductorService
@ -16,10 +18,11 @@ def get_argument_parser():
) )
subparsers = parser.add_subparsers(dest='command', help='sub-command help') subparsers = parser.add_subparsers(dest='command', help='sub-command help')
subparsers.add_parser("download", help="Download blockchain node binary.") subparsers.add_parser("download", help="Download lbcd and lbcwallet node binaries.")
start = subparsers.add_parser("start", help="Start orchstr8 service.") start = subparsers.add_parser("start", help="Start orchstr8 service.")
start.add_argument("--blockchain", help="Hostname to start blockchain node.") start.add_argument("--lbcd", help="Hostname to start lbcd node.")
start.add_argument("--lbcwallet", help="Hostname to start lbcwallet node.")
start.add_argument("--spv", help="Hostname to start SPV server.") start.add_argument("--spv", help="Hostname to start SPV server.")
start.add_argument("--wallet", help="Hostname to start wallet daemon.") start.add_argument("--wallet", help="Hostname to start wallet daemon.")
@ -47,7 +50,8 @@ def main():
if command == 'download': if command == 'download':
logging.getLogger('blockchain').setLevel(logging.INFO) logging.getLogger('blockchain').setLevel(logging.INFO)
get_blockchain_node_from_ledger(wallet).ensure() get_lbcd_node_from_ledger(wallet).ensure()
get_lbcwallet_node_from_ledger(wallet).ensure()
elif command == 'generate': elif command == 'generate':
loop.run_until_complete(run_remote_command( loop.run_until_complete(run_remote_command(
@ -57,9 +61,12 @@ def main():
elif command == 'start': elif command == 'start':
conductor = Conductor() conductor = Conductor()
if getattr(args, 'blockchain', False): if getattr(args, 'lbcd', False):
conductor.blockchain_node.hostname = args.blockchain conductor.lbcd_node.hostname = args.lbcd
loop.run_until_complete(conductor.start_blockchain()) loop.run_until_complete(conductor.start_lbcd())
if getattr(args, 'lbcwallet', False):
conductor.lbcwallet_node.hostname = args.lbcwallet
loop.run_until_complete(conductor.start_lbcwallet())
if getattr(args, 'spv', False): if getattr(args, 'spv', False):
conductor.spv_node.hostname = args.spv conductor.spv_node.hostname = args.spv
loop.run_until_complete(conductor.start_spv()) loop.run_until_complete(conductor.start_spv())

View file

@ -8,6 +8,7 @@ import logging
import tempfile import tempfile
import subprocess import subprocess
import importlib import importlib
import platform
from distutils.util import strtobool from distutils.util import strtobool
from binascii import hexlify from binascii import hexlify
@ -31,11 +32,18 @@ def get_spvserver_from_ledger(ledger_module):
return getattr(spvserver_module, regtest_class_name) return getattr(spvserver_module, regtest_class_name)
def get_blockchain_node_from_ledger(ledger_module): def get_lbcd_node_from_ledger(ledger_module):
return BlockchainNode( return LBCDNode(
ledger_module.__node_url__, ledger_module.__lbcd_url__,
os.path.join(ledger_module.__node_bin__, ledger_module.__node_daemon__), ledger_module.__lbcd__,
os.path.join(ledger_module.__node_bin__, ledger_module.__node_cli__) ledger_module.__lbcctl__
)
def get_lbcwallet_node_from_ledger(ledger_module):
return LBCWalletNode(
ledger_module.__lbcwallet_url__,
ledger_module.__lbcwallet__,
ledger_module.__lbcctl__
) )
@ -45,51 +53,52 @@ class Conductor:
self.manager_module = WalletManager self.manager_module = WalletManager
self.spv_module = get_spvserver_from_ledger(lbry.wallet) self.spv_module = get_spvserver_from_ledger(lbry.wallet)
self.blockchain_node = get_blockchain_node_from_ledger(lbry.wallet) self.lbcd_node = get_lbcd_node_from_ledger(lbry.wallet)
self.lbcwallet_node = get_lbcwallet_node_from_ledger(lbry.wallet)
self.spv_node = SPVNode(self.spv_module) self.spv_node = SPVNode(self.spv_module)
self.wallet_node = WalletNode( self.wallet_node = WalletNode(
self.manager_module, RegTestLedger, default_seed=seed self.manager_module, RegTestLedger, default_seed=seed
) )
self.hub_node = HubNode(__hub_url__, "hub", self.spv_node) self.hub_node = HubNode(__hub_url__, "hub", self.spv_node)
self.blockchain_started = False self.lbcd_started = False
self.lbcwallet_started = False
self.spv_started = False self.spv_started = False
self.wallet_started = False self.wallet_started = False
self.hub_started = False self.hub_started = False
self.log = log.getChild('conductor') self.log = log.getChild('conductor')
async def start_blockchain(self): async def start_lbcd(self):
if not self.blockchain_started: if not self.lbcd_started:
asyncio.create_task(self.blockchain_node.start()) asyncio.create_task(self.lbcd_node.start())
await self.blockchain_node.running.wait() await self.lbcd_node.running.wait()
await self.blockchain_node.generate(200) self.lbcd_started = True
self.blockchain_started = True
async def stop_blockchain(self): async def stop_lbcd(self, cleanup=True):
if self.blockchain_started: if self.lbcd_started:
await self.blockchain_node.stop(cleanup=True) await self.lbcd_node.stop(cleanup)
self.blockchain_started = False self.lbcd_started = False
async def start_hub(self): async def start_hub(self):
if not self.hub_started: if not self.hub_started:
asyncio.create_task(self.hub_node.start()) asyncio.create_task(self.hub_node.start())
await self.blockchain_node.running.wait() await self.lbcwallet_node.running.wait()
self.hub_started = True self.hub_started = True
async def stop_hub(self): async def stop_hub(self, cleanup=True):
if self.hub_started: if self.hub_started:
await self.hub_node.stop(cleanup=True) await self.hub_node.stop(cleanup)
self.hub_started = False self.hub_started = False
async def start_spv(self): async def start_spv(self):
if not self.spv_started: if not self.spv_started:
await self.spv_node.start(self.blockchain_node) await self.spv_node.start(self.lbcwallet_node)
self.spv_started = True self.spv_started = True
async def stop_spv(self): async def stop_spv(self, cleanup=True):
if self.spv_started: if self.spv_started:
await self.spv_node.stop(cleanup=True) await self.spv_node.stop(cleanup)
self.spv_started = False self.spv_started = False
async def start_wallet(self): async def start_wallet(self):
@ -97,13 +106,31 @@ class Conductor:
await self.wallet_node.start(self.spv_node) await self.wallet_node.start(self.spv_node)
self.wallet_started = True self.wallet_started = True
async def stop_wallet(self): async def stop_wallet(self, cleanup=True):
if self.wallet_started: if self.wallet_started:
await self.wallet_node.stop(cleanup=True) await self.wallet_node.stop(cleanup)
self.wallet_started = False self.wallet_started = False
async def start_lbcwallet(self, clean=True):
if not self.lbcwallet_started:
asyncio.create_task(self.lbcwallet_node.start())
await self.lbcwallet_node.running.wait()
if clean:
mining_addr = await self.lbcwallet_node.get_new_address()
self.lbcwallet_node.mining_addr = mining_addr
await self.lbcwallet_node.generate(200)
# unlock the wallet for the next 1 hour
await self.lbcwallet_node.wallet_passphrase("password", 3600)
self.lbcwallet_started = True
async def stop_lbcwallet(self, cleanup=True):
if self.lbcwallet_started:
await self.lbcwallet_node.stop(cleanup)
self.lbcwallet_started = False
async def start(self): async def start(self):
await self.start_blockchain() await self.start_lbcd()
await self.start_lbcwallet()
await self.start_spv() await self.start_spv()
await self.start_wallet() await self.start_wallet()
@ -111,7 +138,8 @@ class Conductor:
all_the_stops = [ all_the_stops = [
self.stop_wallet, self.stop_wallet,
self.stop_spv, self.stop_spv,
self.stop_blockchain self.stop_lbcwallet,
self.stop_lbcd
] ]
for stop in all_the_stops: for stop in all_the_stops:
try: try:
@ -119,6 +147,11 @@ class Conductor:
except Exception as e: except Exception as e:
log.exception('Exception raised while stopping services:', exc_info=e) log.exception('Exception raised while stopping services:', exc_info=e)
async def clear_mempool(self):
await self.stop_lbcwallet(cleanup=False)
await self.stop_lbcd(cleanup=False)
await self.start_lbcd()
await self.start_lbcwallet(clean=False)
class WalletNode: class WalletNode:
@ -139,10 +172,11 @@ class WalletNode:
async def start(self, spv_node: 'SPVNode', seed=None, connect=True, config=None): async def start(self, spv_node: 'SPVNode', seed=None, connect=True, config=None):
wallets_dir = os.path.join(self.data_path, 'wallets') wallets_dir = os.path.join(self.data_path, 'wallets')
os.mkdir(wallets_dir)
wallet_file_name = os.path.join(wallets_dir, 'my_wallet.json') wallet_file_name = os.path.join(wallets_dir, 'my_wallet.json')
with open(wallet_file_name, 'w') as wallet_file: if not os.path.isdir(wallets_dir):
wallet_file.write('{"version": 1, "accounts": []}\n') os.mkdir(wallets_dir)
with open(wallet_file_name, 'w') as wallet_file:
wallet_file.write('{"version": 1, "accounts": []}\n')
self.manager = self.manager_class.from_config({ self.manager = self.manager_class.from_config({
'ledgers': { 'ledgers': {
self.ledger_class.get_id(): { self.ledger_class.get_id(): {
@ -198,14 +232,14 @@ class SPVNode:
self.stopped = False self.stopped = False
self.index_name = uuid4().hex self.index_name = uuid4().hex
async def start(self, blockchain_node: 'BlockchainNode', extraconf=None): async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None):
self.data_path = tempfile.mkdtemp() self.data_path = tempfile.mkdtemp()
conf = { conf = {
'DESCRIPTION': '', 'DESCRIPTION': '',
'PAYMENT_ADDRESS': '', 'PAYMENT_ADDRESS': '',
'DAILY_FEE': '0', 'DAILY_FEE': '0',
'DB_DIRECTORY': self.data_path, 'DB_DIRECTORY': self.data_path,
'DAEMON_URL': blockchain_node.rpc_url, 'DAEMON_URL': lbcwallet_node.rpc_url,
'REORG_LIMIT': '100', 'REORG_LIMIT': '100',
'HOST': self.hostname, 'HOST': self.hostname,
'TCP_PORT': str(self.port), 'TCP_PORT': str(self.port),
@ -240,18 +274,19 @@ class SPVNode:
shutil.rmtree(self.data_path, ignore_errors=True) shutil.rmtree(self.data_path, ignore_errors=True)
class BlockchainProcess(asyncio.SubprocessProtocol): class LBCDProcess(asyncio.SubprocessProtocol):
IGNORE_OUTPUT = [ IGNORE_OUTPUT = [
b'keypool keep', b'keypool keep',
b'keypool reserve', b'keypool reserve',
b'keypool return', b'keypool return',
b'Block submitted',
] ]
def __init__(self): def __init__(self):
self.ready = asyncio.Event() self.ready = asyncio.Event()
self.stopped = asyncio.Event() self.stopped = asyncio.Event()
self.log = log.getChild('blockchain') self.log = log.getChild('lbcd')
def pipe_data_received(self, fd, data): def pipe_data_received(self, fd, data):
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT): if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
@ -262,7 +297,7 @@ class BlockchainProcess(asyncio.SubprocessProtocol):
if b'Error:' in data: if b'Error:' in data:
self.ready.set() self.ready.set()
raise SystemError(data.decode()) raise SystemError(data.decode())
if b'Done loading' in data: if b'RPCS: RPC server listening on' in data:
self.ready.set() self.ready.set()
def process_exited(self): def process_exited(self):
@ -270,10 +305,34 @@ class BlockchainProcess(asyncio.SubprocessProtocol):
self.ready.set() self.ready.set()
class BlockchainNode: class WalletProcess(asyncio.SubprocessProtocol):
P2SH_SEGWIT_ADDRESS = "p2sh-segwit" IGNORE_OUTPUT = [
BECH32_ADDRESS = "bech32" ]
def __init__(self):
self.ready = asyncio.Event()
self.stopped = asyncio.Event()
self.log = log.getChild('lbcwallet')
self.transport: Optional[asyncio.transports.SubprocessTransport] = None
def pipe_data_received(self, fd, data):
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
if b'Error:' in data:
self.log.error(data.decode())
else:
self.log.info(data.decode())
if b'Error:' in data:
self.ready.set()
raise SystemError(data.decode())
if b'WLLT: Finished rescan' in data:
self.ready.set()
def process_exited(self):
self.stopped.set()
self.ready.set()
class LBCDNode:
def __init__(self, url, daemon, cli): def __init__(self, url, daemon, cli):
self.latest_release_url = url self.latest_release_url = url
@ -281,28 +340,22 @@ class BlockchainNode:
self.bin_dir = os.path.join(self.project_dir, 'bin') self.bin_dir = os.path.join(self.project_dir, 'bin')
self.daemon_bin = os.path.join(self.bin_dir, daemon) self.daemon_bin = os.path.join(self.bin_dir, daemon)
self.cli_bin = os.path.join(self.bin_dir, cli) self.cli_bin = os.path.join(self.bin_dir, cli)
self.log = log.getChild('blockchain') self.log = log.getChild('lbcd')
self.data_path = None self.data_path = tempfile.mkdtemp()
self.protocol = None self.protocol = None
self.transport = None self.transport = None
self.block_expected = 0
self.hostname = 'localhost' self.hostname = 'localhost'
self.peerport = 9246 + 2 # avoid conflict with default peer port self.peerport = 29246
self.rpcport = 9245 + 2 # avoid conflict with default rpc port self.rpcport = 29245
self.rpcuser = 'rpcuser' self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword' self.rpcpassword = 'rpcpassword'
self.stopped = False self.stopped = True
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.running = asyncio.Event() self.running = asyncio.Event()
@property @property
def rpc_url(self): def rpc_url(self):
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/' return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
def is_expected_block(self, e: BlockHeightEvent):
return self.block_expected == e.height
@property @property
def exists(self): def exists(self):
return ( return (
@ -311,6 +364,12 @@ class BlockchainNode:
) )
def download(self): def download(self):
uname = platform.uname()
target_os = str.lower(uname.system)
target_arch = str.replace(uname.machine, 'x86_64', 'amd64')
target_platform = target_os + '_' + target_arch
self.latest_release_url = str.replace(self.latest_release_url, 'TARGET_PLATFORM', target_platform)
downloaded_file = os.path.join( downloaded_file = os.path.join(
self.bin_dir, self.bin_dir,
self.latest_release_url[self.latest_release_url.rfind('/')+1:] self.latest_release_url[self.latest_release_url.rfind('/')+1:]
@ -345,34 +404,34 @@ class BlockchainNode:
async def start(self): async def start(self):
assert self.ensure() assert self.ensure()
self.data_path = tempfile.mkdtemp()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop) asyncio.get_child_watcher().attach_loop(loop)
command = [ command = [
self.daemon_bin, self.daemon_bin,
f'-datadir={self.data_path}', '-printtoconsole', '-regtest', '-server', '-txindex', '--notls',
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}', f'--datadir={self.data_path}',
f'-port={self.peerport}' '--regtest', f'--listen=127.0.0.1:{self.peerport}', f'--rpclisten=127.0.0.1:{self.rpcport}',
'--txindex', f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}'
] ]
self.log.info(' '.join(command)) self.log.info(' '.join(command))
while not self.stopped: while self.stopped:
if self.running.is_set(): if self.running.is_set():
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
await self.restart_ready.wait()
try: try:
self.transport, self.protocol = await loop.subprocess_exec( self.transport, self.protocol = await loop.subprocess_exec(
BlockchainProcess, *command LBCDProcess, *command
) )
await self.protocol.ready.wait() await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set() assert not self.protocol.stopped.is_set()
self.running.set() self.running.set()
self.stopped = False
except asyncio.CancelledError: except asyncio.CancelledError:
self.running.clear() self.running.clear()
raise raise
except Exception as e: except Exception as e:
self.running.clear() self.running.clear()
log.exception('failed to start lbrycrdd', exc_info=e) log.exception('failed to start lbcd', exc_info=e)
async def stop(self, cleanup=True): async def stop(self, cleanup=True):
self.stopped = True self.stopped = True
@ -381,26 +440,147 @@ class BlockchainNode:
await self.protocol.stopped.wait() await self.protocol.stopped.wait()
self.transport.close() self.transport.close()
finally: finally:
self.log.info("Done shutting down " + self.daemon_bin)
if cleanup: if cleanup:
self.cleanup() self.cleanup()
self.running.clear()
async def clear_mempool(self):
self.restart_ready.clear()
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
self.running.clear()
os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
self.restart_ready.set()
await self.running.wait()
def cleanup(self): def cleanup(self):
assert self.stopped
shutil.rmtree(self.data_path, ignore_errors=True) shutil.rmtree(self.data_path, ignore_errors=True)
class LBCWalletNode:
P2SH_SEGWIT_ADDRESS = "p2sh-segwit"
BECH32_ADDRESS = "bech32"
def __init__(self, url, lbcwallet, cli):
self.latest_release_url = url
self.project_dir = os.path.dirname(os.path.dirname(__file__))
self.bin_dir = os.path.join(self.project_dir, 'bin')
self.lbcwallet_bin = os.path.join(self.bin_dir, lbcwallet)
self.cli_bin = os.path.join(self.bin_dir, cli)
self.log = log.getChild('lbcwallet')
self.protocol = None
self.transport = None
self.hostname = 'localhost'
self.lbcd_rpcport = 29245
self.lbcwallet_rpcport = 29244
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
self.data_path = tempfile.mkdtemp()
self.stopped = True
self.running = asyncio.Event()
self.block_expected = 0
self.mining_addr = ''
@property
def rpc_url(self):
# FIXME: somehow the hub/sdk doesn't learn the blocks through the Walet RPC port, why?
# return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.lbcwallet_rpcport}/'
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.lbcd_rpcport}/'
def is_expected_block(self, e: BlockHeightEvent):
return self.block_expected == e.height
@property
def exists(self):
return (
os.path.exists(self.lbcwallet_bin)
)
def download(self):
uname = platform.uname()
target_os = str.lower(uname.system)
target_arch = str.replace(uname.machine, 'x86_64', 'amd64')
target_platform = target_os + '_' + target_arch
self.latest_release_url = str.replace(self.latest_release_url, 'TARGET_PLATFORM', target_platform)
downloaded_file = os.path.join(
self.bin_dir,
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
)
if not os.path.exists(self.bin_dir):
os.mkdir(self.bin_dir)
if not os.path.exists(downloaded_file):
self.log.info('Downloading: %s', self.latest_release_url)
with urllib.request.urlopen(self.latest_release_url) as response:
with open(downloaded_file, 'wb') as out_file:
shutil.copyfileobj(response, out_file)
self.log.info('Extracting: %s', downloaded_file)
if downloaded_file.endswith('.zip'):
with zipfile.ZipFile(downloaded_file) as dotzip:
dotzip.extractall(self.bin_dir)
# zipfile bug https://bugs.python.org/issue15795
os.chmod(self.lbcwallet_bin, 0o755)
elif downloaded_file.endswith('.tar.gz'):
with tarfile.open(downloaded_file) as tar:
tar.extractall(self.bin_dir)
return self.exists
def ensure(self):
return self.exists or self.download()
async def start(self):
assert self.ensure()
loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop)
command = [
self.lbcwallet_bin,
'--noservertls', '--noclienttls',
'--regtest',
f'--rpcconnect=127.0.0.1:{self.lbcd_rpcport}', f'--rpclisten=127.0.0.1:{self.lbcwallet_rpcport}',
'--createtemp', f'--appdata={self.data_path}',
f'--username={self.rpcuser}', f'--password={self.rpcpassword}'
]
self.log.info(' '.join(command))
while self.stopped:
if self.running.is_set():
await asyncio.sleep(1)
continue
try:
self.transport, self.protocol = await loop.subprocess_exec(
WalletProcess, *command
)
self.protocol.transport = self.transport
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.running.set()
self.stopped = False
except asyncio.CancelledError:
self.running.clear()
raise
except Exception as e:
self.running.clear()
log.exception('failed to start lbcwallet', exc_info=e)
def cleanup(self):
assert self.stopped
shutil.rmtree(self.data_path, ignore_errors=True)
async def stop(self, cleanup=True):
self.stopped = True
try:
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
finally:
self.log.info("Done shutting down " + self.lbcwallet_bin)
if cleanup:
self.cleanup()
self.running.clear()
async def _cli_cmnd(self, *args): async def _cli_cmnd(self, *args):
cmnd_args = [ cmnd_args = [
self.cli_bin, f'-datadir={self.data_path}', '-regtest', self.cli_bin,
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}' f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}', '--notls', '--regtest', '--wallet'
] + list(args) ] + list(args)
self.log.info(' '.join(cmnd_args)) self.log.info(' '.join(cmnd_args))
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@ -417,7 +597,14 @@ class BlockchainNode:
def generate(self, blocks): def generate(self, blocks):
self.block_expected += blocks self.block_expected += blocks
return self._cli_cmnd('generate', str(blocks)) return self._cli_cmnd('generatetoaddress', str(blocks), self.mining_addr)
def generate_to_address(self, blocks, addr):
self.block_expected += blocks
return self._cli_cmnd('generatetoaddress', str(blocks), addr)
def wallet_passphrase(self, passphrase, timeout):
return self._cli_cmnd('walletpassphrase', passphrase, str(timeout))
def invalidate_block(self, blockhash): def invalidate_block(self, blockhash):
return self._cli_cmnd('invalidateblock', blockhash) return self._cli_cmnd('invalidateblock', blockhash)
@ -434,7 +621,7 @@ class BlockchainNode:
def get_raw_change_address(self): def get_raw_change_address(self):
return self._cli_cmnd('getrawchangeaddress') return self._cli_cmnd('getrawchangeaddress')
def get_new_address(self, address_type): def get_new_address(self, address_type='legacy'):
return self._cli_cmnd('getnewaddress', "", address_type) return self._cli_cmnd('getnewaddress', "", address_type)
async def get_balance(self): async def get_balance(self):
@ -450,7 +637,10 @@ class BlockchainNode:
return self._cli_cmnd('createrawtransaction', json.dumps(inputs), json.dumps(outputs)) return self._cli_cmnd('createrawtransaction', json.dumps(inputs), json.dumps(outputs))
async def sign_raw_transaction_with_wallet(self, tx): async def sign_raw_transaction_with_wallet(self, tx):
return json.loads(await self._cli_cmnd('signrawtransactionwithwallet', tx))['hex'].encode() # the "withwallet" portion should only come into play if we are doing segwit.
# and "withwallet" doesn't exist on lbcd yet.
result = await self._cli_cmnd('signrawtransaction', tx)
return json.loads(result)['hex'].encode()
def decode_raw_transaction(self, tx): def decode_raw_transaction(self, tx):
return self._cli_cmnd('decoderawtransaction', hexlify(tx.raw).decode()) return self._cli_cmnd('decoderawtransaction', hexlify(tx.raw).decode())
@ -496,8 +686,6 @@ class HubNode:
self.hostname = 'localhost' self.hostname = 'localhost'
self.rpcport = 50051 # avoid conflict with default rpc port self.rpcport = 50051 # avoid conflict with default rpc port
self.stopped = False self.stopped = False
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.running = asyncio.Event() self.running = asyncio.Event()
@property @property
@ -554,7 +742,6 @@ class HubNode:
if self.running.is_set(): if self.running.is_set():
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
await self.restart_ready.wait()
try: try:
if not self.debug: if not self.debug:
self.transport, self.protocol = await loop.subprocess_exec( self.transport, self.protocol = await loop.subprocess_exec(

View file

@ -61,8 +61,10 @@ class ConductorService:
#set_logging( #set_logging(
# self.stack.ledger_module, logging.DEBUG, WebSocketLogHandler(self.send_message) # self.stack.ledger_module, logging.DEBUG, WebSocketLogHandler(self.send_message)
#) #)
self.stack.blockchain_started or await self.stack.start_blockchain() self.stack.lbcd_started or await self.stack.start_lbcd()
self.send_message({'type': 'service', 'name': 'blockchain', 'port': self.stack.blockchain_node.port}) self.send_message({'type': 'service', 'name': 'lbcd', 'port': self.stack.lbcd_node.port})
self.stack.lbcwallet_started or await self.stack.start_lbcwallet()
self.send_message({'type': 'service', 'name': 'lbcwallet', 'port': self.stack.lbcwallet_node.port})
self.stack.spv_started or await self.stack.start_spv() self.stack.spv_started or await self.stack.start_spv()
self.send_message({'type': 'service', 'name': 'spv', 'port': self.stack.spv_node.port}) self.send_message({'type': 'service', 'name': 'spv', 'port': self.stack.spv_node.port})
self.stack.wallet_started or await self.stack.start_wallet() self.stack.wallet_started or await self.stack.start_wallet()
@ -74,7 +76,7 @@ class ConductorService:
async def generate(self, request): async def generate(self, request):
data = await request.post() data = await request.post()
blocks = data.get('blocks', 1) blocks = data.get('blocks', 1)
await self.stack.blockchain_node.generate(int(blocks)) await self.stack.lbcwallet_node.generate(int(blocks))
return json_response({'blocks': blocks}) return json_response({'blocks': blocks})
async def transfer(self, request): async def transfer(self, request):
@ -85,7 +87,7 @@ class ConductorService:
if not address: if not address:
raise ValueError("No address was provided.") raise ValueError("No address was provided.")
amount = data.get('amount', 1) amount = data.get('amount', 1)
txid = await self.stack.blockchain_node.send_to_address(address, amount) txid = await self.stack.lbcwallet_node.send_to_address(address, amount)
if self.stack.wallet_started: if self.stack.wallet_started:
await self.stack.wallet_node.ledger.on_transaction.where( await self.stack.wallet_node.ledger.on_transaction.where(
lambda e: e.tx.id == txid and e.address == address lambda e: e.tx.id == txid and e.address == address
@ -98,7 +100,7 @@ class ConductorService:
async def balance(self, _): async def balance(self, _):
return json_response({ return json_response({
'balance': await self.stack.blockchain_node.get_balance() 'balance': await self.stack.lbcwallet_node.get_balance()
}) })
async def log(self, request): async def log(self, request):
@ -129,7 +131,7 @@ class ConductorService:
'type': 'status', 'type': 'status',
'height': self.stack.wallet_node.ledger.headers.height, 'height': self.stack.wallet_node.ledger.headers.height,
'balance': satoshis_to_coins(await self.stack.wallet_node.account.get_balance()), 'balance': satoshis_to_coins(await self.stack.wallet_node.account.get_balance()),
'miner': await self.stack.blockchain_node.get_balance() 'miner': await self.stack.lbcwallet_node.get_balance()
}) })
def send_message(self, msg): def send_message(self, msg):

View file

@ -250,14 +250,14 @@ class Daemon:
async def deserialised_block(self, hex_hash): async def deserialised_block(self, hex_hash):
"""Return the deserialised block with the given hex hash.""" """Return the deserialised block with the given hex hash."""
if hex_hash not in self._block_cache: if hex_hash not in self._block_cache:
block = await self._send_single('getblock', (hex_hash, True)) block = await self._send_single('getblock', (hex_hash, 1))
self._block_cache[hex_hash] = block self._block_cache[hex_hash] = block
return block return block
return self._block_cache[hex_hash] return self._block_cache[hex_hash]
async def raw_blocks(self, hex_hashes): async def raw_blocks(self, hex_hashes):
"""Return the raw binary blocks with the given hex hashes.""" """Return the raw binary blocks with the given hex hashes."""
params_iterable = ((h, False) for h in hex_hashes) params_iterable = ((h, 0) for h in hex_hashes)
blocks = await self._send_vector('getblock', params_iterable) blocks = await self._send_vector('getblock', params_iterable)
# Convert hex string to bytes # Convert hex string to bytes
return [hex_to_bytes(block) for block in blocks] return [hex_to_bytes(block) for block in blocks]
@ -334,42 +334,7 @@ class LBCDaemon(Daemon):
async def getrawtransaction(self, hex_hash, verbose=False): async def getrawtransaction(self, hex_hash, verbose=False):
return await super().getrawtransaction(hex_hash=hex_hash, verbose=verbose) return await super().getrawtransaction(hex_hash=hex_hash, verbose=verbose)
@handles_errors
async def getclaimbyid(self, claim_id):
'''Given a claim id, retrieves claim information.'''
return await self._send_single('getclaimbyid', (claim_id,))
@handles_errors
async def getclaimsbyids(self, claim_ids):
'''Given a list of claim ids, batches calls to retrieve claim information.'''
return await self._send_vector('getclaimbyid', ((claim_id,) for claim_id in claim_ids))
@handles_errors @handles_errors
async def getclaimsforname(self, name): async def getclaimsforname(self, name):
'''Given a name, retrieves all claims matching that name.''' '''Given a name, retrieves all claims matching that name.'''
return await self._send_single('getclaimsforname', (name,)) return await self._send_single('getclaimsforname', (name,))
@handles_errors
async def getclaimsfortx(self, txid):
'''Given a txid, returns the claims it make.'''
return await self._send_single('getclaimsfortx', (txid,)) or []
@handles_errors
async def getnameproof(self, name, block_hash=None):
'''Given a name and optional block_hash, returns a name proof and winner, if any.'''
return await self._send_single('getnameproof', (name, block_hash,) if block_hash else (name,))
@handles_errors
async def getvalueforname(self, name):
'''Given a name, returns the winning claim value.'''
return await self._send_single('getvalueforname', (name,))
@handles_errors
async def getnamesintrie(self):
'''Given a name, returns the winning claim value.'''
return await self._send_single('getnamesintrie')
@handles_errors
async def claimname(self, name, hexvalue, amount):
'''Claim a name, used for functional tests only.'''
return await self._send_single('claimname', (name, hexvalue, float(amount)))

View file

@ -1,32 +1,130 @@
import struct import struct
import rocksdb
from typing import Optional from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
class KeyValueStorage: class RocksDBStore:
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
# Use snappy compression (the default)
self.path = path
self._max_open_files = max_open_files
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
# self.multi_get = self.db.multi_get
def get_options(self):
return rocksdb.Options(
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
max_open_files=self._max_open_files
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
raise NotImplemented() return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True): include_key=True, include_value=True, fill_cache=True):
raise NotImplemented() return RocksDBIterator(
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value
)
def write_batch(self, transaction: bool = False): def write_batch(self, disable_wal: bool = False, sync: bool = False):
raise NotImplemented() return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
def close(self): def close(self):
raise NotImplemented() self.db.close()
self.db = None
@property @property
def closed(self) -> bool: def closed(self) -> bool:
raise NotImplemented() return self.db is None
def try_catch_up_with_primary(self):
self.db.try_catch_up_with_primary()
class RocksDBWriteBatch:
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
self.batch = rocksdb.WriteBatch()
self.db = db
self.sync = sync
self.disable_wal = disable_wal
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
class RocksDBIterator:
"""An iterator for RocksDB."""
__slots__ = [
'start',
'prefix',
'stop',
'iterator',
'include_key',
'include_value',
'prev_k',
'reverse',
'include_start',
'include_stop'
]
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None,
include_key: bool = True, include_value: bool = True, reverse: bool = False,
include_start: bool = True, include_stop: bool = False):
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
self.start = start
self.prefix = prefix
self.stop = stop
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
if prefix is not None:
self.iterator.seek(prefix)
elif start is not None:
self.iterator.seek(start)
self.include_key = include_key
self.include_value = include_value
self.prev_k = None
self.reverse = reverse
self.include_start = include_start
self.include_stop = include_stop
def __iter__(self):
return self
def _check_stop_iteration(self, key: bytes):
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
raise StopIteration
elif self.start is not None and self.start > key[:len(self.start)]:
raise StopIteration
elif self.prefix is not None and not key.startswith(self.prefix):
raise StopIteration
def __next__(self):
# TODO: include start/stop on/off
# check for needing to stop from previous iteration
if self.prev_k is not None:
self._check_stop_iteration(self.prev_k)
k, v = next(self.iterator)
self._check_stop_iteration(k)
self.prev_k = k
if self.include_key and self.include_value:
return k, v
elif self.include_key:
return k
return v
class PrefixDB: class PrefixDB:
UNDO_KEY_STRUCT = struct.Struct(b'>Q') UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None): def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
self._db = db self._db = db
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes) self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
self._max_undo_depth = max_undo_depth self._max_undo_depth = max_undo_depth
@ -37,7 +135,7 @@ class PrefixDB:
Changes written cannot be undone Changes written cannot be undone
""" """
try: try:
with self._db.write_batch(transaction=True) as batch: with self._db.write_batch(sync=True) as batch:
batch_put = batch.put batch_put = batch.put
batch_delete = batch.delete batch_delete = batch.delete
for staged_change in self._op_stack: for staged_change in self._op_stack:
@ -61,7 +159,7 @@ class PrefixDB:
include_value=False include_value=False
)) ))
try: try:
with self._db.write_batch(transaction=True) as batch: with self._db.write_batch(sync=True) as batch:
batch_put = batch.put batch_put = batch.put
batch_delete = batch.delete batch_delete = batch.delete
for staged_change in self._op_stack: for staged_change in self._op_stack:
@ -82,7 +180,7 @@ class PrefixDB:
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height) undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key)) self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
try: try:
with self._db.write_batch(transaction=True) as batch: with self._db.write_batch(sync=True) as batch:
batch_put = batch.put batch_put = batch.put
batch_delete = batch.delete batch_delete = batch.delete
for staged_change in self._op_stack: for staged_change in self._op_stack:
@ -108,6 +206,9 @@ class PrefixDB:
if not self._db.closed: if not self._db.closed:
self._db.close() self._db.close()
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
@property @property
def closed(self): def closed(self):
return self._db.closed return self._db.closed

View file

@ -4,7 +4,7 @@ import array
import base64 import base64
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name from lbry.schema.url import normalize_name
@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType):
value_struct: struct.Struct value_struct: struct.Struct
key_part_lambdas = [] key_part_lambdas = []
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack): def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack):
self._db = db self._db = db
self._op_stack = op_stack self._op_stack = op_stack
@ -1595,40 +1595,10 @@ class BlockTxsPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(tx_hashes) return cls.pack_key(height), cls.pack_value(tx_hashes)
class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel
self.db = plyvel.DB(
path, create_if_missing=True, max_open_files=max_open_files,
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return self.db.iterator(
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
)
def write_batch(self, transaction: bool = False, sync: bool = False):
return self.db.write_batch(transaction=transaction, sync=sync)
def close(self):
return self.db.close()
@property
def closed(self) -> bool:
return self.db.closed
class HubDB(PrefixDB): class HubDB(PrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
db = LevelDBStore(path, cache_mb, max_open_files) db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path)
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes) super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack) self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack) self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)

View file

@ -26,7 +26,10 @@ class Server:
self.prometheus_server: typing.Optional[PrometheusServer] = None self.prometheus_server: typing.Optional[PrometheusServer] = None
self.session_mgr = LBRYSessionManager( self.session_mgr = LBRYSessionManager(
env, db, bp, daemon, self.shutdown_event env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
self.shutdown_event,
on_available_callback=bp.status_server.set_available,
on_unavailable_callback=bp.status_server.set_unavailable
) )
self._indexer_task = None self._indexer_task = None

View file

@ -170,13 +170,16 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event): def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
daemon: 'Daemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.env = env self.env = env
self.db = db self.db = db
self.bp = bp self.on_available_callback = on_available_callback
self.on_unavailable_callback = on_unavailable_callback
self.daemon = daemon self.daemon = daemon
self.mempool = bp.mempool self.mempool = mempool
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {} self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
@ -186,7 +189,9 @@ class SessionManager:
self.cur_group = SessionGroup(0) self.cur_group = SessionGroup(0)
self.txs_sent = 0 self.txs_sent = 0
self.start_time = time.time() self.start_time = time.time()
self.history_cache = self.bp.history_cache self.history_cache = history_cache
self.resolve_cache = resolve_cache
self.resolve_outputs_cache = resolve_outputs_cache
self.notified_height: typing.Optional[int] = None self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
@ -243,7 +248,7 @@ class SessionManager:
await self.session_event.wait() await self.session_event.wait()
self.session_event.clear() self.session_event.clear()
if not paused and len(self.sessions) >= max_sessions: if not paused and len(self.sessions) >= max_sessions:
self.bp.status_server.set_unavailable() self.on_unavailable_callback()
self.logger.info(f'maximum sessions {max_sessions:,d} ' self.logger.info(f'maximum sessions {max_sessions:,d} '
f'reached, stopping new connections until ' f'reached, stopping new connections until '
f'count drops to {low_watermark:,d}') f'count drops to {low_watermark:,d}')
@ -252,7 +257,7 @@ class SessionManager:
# Start listening for incoming connections if paused and # Start listening for incoming connections if paused and
# session count has fallen # session count has fallen
if paused and len(self.sessions) <= low_watermark: if paused and len(self.sessions) <= low_watermark:
self.bp.status_server.set_available() self.on_available_callback()
self.logger.info('resuming listening for incoming connections') self.logger.info('resuming listening for incoming connections')
await self._start_external_servers() await self._start_external_servers()
paused = False paused = False
@ -533,7 +538,7 @@ class SessionManager:
await self.start_other() await self.start_other()
await self._start_external_servers() await self._start_external_servers()
server_listening_event.set() server_listening_event.set()
self.bp.status_server.set_available() self.on_available_callback()
# Peer discovery should start after the external servers # Peer discovery should start after the external servers
# because we connect to ourself # because we connect to ourself
await asyncio.wait([ await asyncio.wait([
@ -628,8 +633,9 @@ class SessionManager:
for hashX in touched.intersection(self.mempool_statuses.keys()): for hashX in touched.intersection(self.mempool_statuses.keys()):
self.mempool_statuses.pop(hashX, None) self.mempool_statuses.pop(hashX, None)
# self.bp._chain_executor
await asyncio.get_event_loop().run_in_executor( await asyncio.get_event_loop().run_in_executor(
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys() None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
) )
if touched or new_touched or (height_changed and self.mempool_statuses): if touched or new_touched or (height_changed and self.mempool_statuses):
@ -866,8 +872,7 @@ class LBRYElectrumX(SessionBase):
self.protocol_tuple = self.PROTOCOL_MIN self.protocol_tuple = self.PROTOCOL_MIN
self.protocol_string = None self.protocol_string = None
self.daemon = self.session_mgr.daemon self.daemon = self.session_mgr.daemon
self.bp: BlockProcessor = self.session_mgr.bp self.db: LevelDB = self.session_mgr.db
self.db: LevelDB = self.bp.db
@classmethod @classmethod
def protocol_min_max_strings(cls): def protocol_min_max_strings(cls):
@ -1007,21 +1012,21 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def _cached_resolve_url(self, url): async def _cached_resolve_url(self, url):
if url not in self.bp.resolve_cache: if url not in self.session_mgr.resolve_cache:
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.bp.resolve_cache[url] return self.session_mgr.resolve_cache[url]
async def claimtrie_resolve(self, *urls) -> str: async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls)) sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
try: try:
if sorted_urls in self.bp.resolve_outputs_cache: if sorted_urls in self.session_mgr.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls] return self.session_mgr.resolve_outputs_cache[sorted_urls]
rows, extra = [], [] rows, extra = [], []
for url in urls: for url in urls:
if url not in self.bp.resolve_cache: if url not in self.session_mgr.resolve_cache:
self.bp.resolve_cache[url] = await self._cached_resolve_url(url) self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url] stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
if isinstance(channel, ResolveCensoredError): if isinstance(channel, ResolveCensoredError):
rows.append(channel) rows.append(channel)
extra.append(channel.censor_row) extra.append(channel.censor_row)
@ -1046,7 +1051,7 @@ class LBRYElectrumX(SessionBase):
if reposted_channel: if reposted_channel:
extra.append(reposted_channel) extra.append(reposted_channel)
await asyncio.sleep(0) await asyncio.sleep(0)
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None None, Outputs.to_base64, rows, extra, 0, None, None
) )
return result return result
@ -1054,7 +1059,7 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.db.db_height
async def transaction_get_height(self, tx_hash): async def transaction_get_height(self, tx_hash):
self.assert_tx_hash(tx_hash) self.assert_tx_hash(tx_hash)
@ -1466,7 +1471,7 @@ class LBRYElectrumX(SessionBase):
if mempool_tx: if mempool_tx:
raw_tx, block_hash = mempool_tx.raw_tx.hex(), None raw_tx, block_hash = mempool_tx.raw_tx.hex(), None
else: else:
tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) tx_info = await self.daemon_request('getrawtransaction', tx_hash, 1)
raw_tx = tx_info['hex'] raw_tx = tx_info['hex']
block_hash = tx_info.get('blockhash') block_hash = tx_info.get('blockhash')
if block_hash: if block_hash:
@ -1503,7 +1508,7 @@ class LBRYElectrumX(SessionBase):
if verbose not in (True, False): if verbose not in (True, False):
raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean') raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean')
return await self.daemon_request('getrawtransaction', tx_hash, verbose) return await self.daemon_request('getrawtransaction', tx_hash, int(verbose))
def _get_merkle_branch(self, tx_hashes, tx_pos): def _get_merkle_branch(self, tx_hashes, tx_pos):
"""Return a merkle branch to a transaction. """Return a merkle branch to a transaction.

View file

@ -40,22 +40,17 @@ def checkrecord(record, expected_winner, expected_claim):
async def checkcontrolling(daemon: Daemon, db: SQLDB): async def checkcontrolling(daemon: Daemon, db: SQLDB):
records, claim_ids, names, futs = [], [], [], [] records, names, futs = [], [], []
for record in db.get_claims('claimtrie.claim_hash as is_controlling, claim.*', is_controlling=True): for record in db.get_claims('claimtrie.claim_hash as is_controlling, claim.*', is_controlling=True):
records.append(record) records.append(record)
claim_id = hex_reverted(record['claim_hash']) claim_id = hex_reverted(record['claim_hash'])
claim_ids.append((claim_id,)) names.append((record['normalized'], (claim_id,), "", True)) # last parameter is IncludeValues
names.append((record['normalized'],))
if len(names) > 50000: if len(names) > 50000:
futs.append(daemon._send_vector('getvalueforname', names[:])) futs.append(daemon._send_vector('getclaimsfornamebyid', names))
futs.append(daemon._send_vector('getclaimbyid', claim_ids[:]))
names.clear() names.clear()
claim_ids.clear()
if names: if names:
futs.append(daemon._send_vector('getvalueforname', names[:])) futs.append(daemon._send_vector('getclaimsfornamebyid', names))
futs.append(daemon._send_vector('getclaimbyid', claim_ids[:]))
names.clear() names.clear()
claim_ids.clear()
while futs: while futs:
winners, claims = futs.pop(0), futs.pop(0) winners, claims = futs.pop(0), futs.pop(0)

View file

@ -7,9 +7,11 @@ BASE = os.path.dirname(__file__)
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh: with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
long_description = fh.read() long_description = fh.read()
PLYVEL = []
if sys.platform.startswith('linux'): ROCKSDB = []
PLYVEL.append('plyvel==1.3.0') if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
ROCKSDB.append('lbry-rocksdb==0.8.1')
setup( setup(
name=__name__, name=__name__,
@ -57,7 +59,7 @@ setup(
'pylru==1.1.0', 'pylru==1.1.0',
'elasticsearch==7.10.1', 'elasticsearch==7.10.1',
'grpcio==1.38.0' 'grpcio==1.38.0'
] + PLYVEL, ] + ROCKSDB,
extras_require={ extras_require={
'torrent': ['lbry-libtorrent'], 'torrent': ['lbry-libtorrent'],
'lint': ['pylint==2.10.0'], 'lint': ['pylint==2.10.0'],

47
test_rocksdb.py Executable file
View file

@ -0,0 +1,47 @@
#! python
import os
import shutil
import rocksdb
import tempfile
import logging
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
def _main(db_loc):
opts = rocksdb.Options(create_if_missing=True)
db = rocksdb.DB(os.path.join(db_loc, "test"), opts)
secondary_location = os.path.join(db_loc, "secondary")
secondary = rocksdb.DB(
os.path.join(db_loc, "test"),
rocksdb.Options(create_if_missing=True, max_open_files=-1),
secondary_name=secondary_location
)
try:
assert secondary.get(b"a") is None
db.put(b"a", b"b")
assert db.get(b"a") == b"b"
assert secondary.get(b"a") is None
secondary.try_catch_up_with_primary()
assert secondary.get(b"a") == b"b"
finally:
secondary.close()
db.close()
def main():
db_dir = tempfile.mkdtemp()
try:
_main(db_dir)
log.info("rocksdb %s (%s) works!", rocksdb.__version__, rocksdb.ROCKSDB_VERSION)
except:
log.exception("boom")
finally:
shutil.rmtree(db_dir)
if __name__ == "__main__":
main()

View file

@ -112,7 +112,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.conductor.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
@ -191,7 +191,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.conductor.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg

View file

@ -135,7 +135,7 @@ class ReconnectTests(IntegrationTestCase):
await self.conductor.spv_node.stop() await self.conductor.spv_node.stop()
self.assertFalse(self.ledger.network.is_connected) self.assertFalse(self.ledger.network.is_connected)
await asyncio.sleep(0.2) # let it retry and fail once await asyncio.sleep(0.2) # let it retry and fail once
await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
await self.ledger.network.on_connected.first await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected) self.assertTrue(self.ledger.network.is_connected)
@ -165,8 +165,10 @@ class UDPServerFailDiscoveryTest(AsyncioTestCase):
async def test_wallet_connects_despite_lack_of_udp(self): async def test_wallet_connects_despite_lack_of_udp(self):
conductor = Conductor() conductor = Conductor()
conductor.spv_node.udp_port = '0' conductor.spv_node.udp_port = '0'
await conductor.start_blockchain() await conductor.start_lbcd()
self.addCleanup(conductor.stop_blockchain) self.addCleanup(conductor.stop_lbcd)
await conductor.start_lbcwallet()
self.addCleanup(conductor.stop_lbcwallet)
await conductor.start_spv() await conductor.start_spv()
self.addCleanup(conductor.stop_spv) self.addCleanup(conductor.stop_spv)
self.assertFalse(conductor.spv_node.server.bp.status_server.is_running) self.assertFalse(conductor.spv_node.server.bp.status_server.is_running)

View file

@ -48,7 +48,7 @@ class WalletCommands(CommandTestCase):
self.assertEqual(status['wallet']['servers'][0]['port'], 50002) self.assertEqual(status['wallet']['servers'][0]['port'], 50002)
await self.conductor.spv_node.stop(True) await self.conductor.spv_node.stop(True)
self.conductor.spv_node.port = 54320 self.conductor.spv_node.port = 54320
await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
status = await self.daemon.jsonrpc_status() status = await self.daemon.jsonrpc_status()
self.assertEqual(len(status['wallet']['servers']), 0) self.assertEqual(len(status['wallet']['servers']), 0)
self.daemon.jsonrpc_settings_set('lbryum_servers', ['localhost:54320']) self.daemon.jsonrpc_settings_set('lbryum_servers', ['localhost:54320'])
@ -58,15 +58,15 @@ class WalletCommands(CommandTestCase):
self.assertEqual(status['wallet']['servers'][0]['port'], 54320) self.assertEqual(status['wallet']['servers'][0]['port'], 54320)
async def test_sending_to_scripthash_address(self): async def test_sending_to_scripthash_address(self):
self.assertEqual(await self.blockchain.get_balance(), '95.99973580') bal = await self.blockchain.get_balance()
await self.assertBalance(self.account, '10.0') await self.assertBalance(self.account, '10.0')
p2sh_address1 = await self.blockchain.get_new_address(self.blockchain.P2SH_SEGWIT_ADDRESS) p2sh_address1 = await self.blockchain.get_new_address(self.blockchain.P2SH_SEGWIT_ADDRESS)
tx = await self.account_send('2.0', p2sh_address1) tx = await self.account_send('2.0', p2sh_address1)
self.assertEqual(tx['outputs'][0]['address'], p2sh_address1) self.assertEqual(tx['outputs'][0]['address'], p2sh_address1)
self.assertEqual(await self.blockchain.get_balance(), '98.99973580') # +1 lbc for confirm block self.assertEqual(await self.blockchain.get_balance(), str(float(bal)+3)) # +1 lbc for confirm block
await self.assertBalance(self.account, '7.999877') await self.assertBalance(self.account, '7.999877')
await self.wallet_send('3.0', p2sh_address1) await self.wallet_send('3.0', p2sh_address1)
self.assertEqual(await self.blockchain.get_balance(), '102.99973580') # +1 lbc for confirm block self.assertEqual(await self.blockchain.get_balance(), str(float(bal)+7)) # +1 lbc for confirm block
await self.assertBalance(self.account, '4.999754') await self.assertBalance(self.account, '4.999754')
async def test_balance_caching(self): async def test_balance_caching(self):

View file

@ -1553,7 +1553,7 @@ class StreamCommands(ClaimTestCase):
) )
# test setting from env vars and starting from scratch # test setting from env vars and starting from scratch
await self.conductor.spv_node.stop(False) await self.conductor.spv_node.stop(False)
await self.conductor.spv_node.start(self.conductor.blockchain_node, await self.conductor.spv_node.start(self.conductor.lbcwallet_node,
extraconf={'BLOCKING_CHANNEL_IDS': blocking_channel_id, extraconf={'BLOCKING_CHANNEL_IDS': blocking_channel_id,
'FILTERING_CHANNEL_IDS': filtering_channel_id}) 'FILTERING_CHANNEL_IDS': filtering_channel_id})
await self.daemon.wallet_manager.reset() await self.daemon.wallet_manager.reset()

View file

@ -31,13 +31,13 @@ class BaseResolveTestCase(CommandTestCase):
self.assertEqual(claim_from_es['effective_amount'], claim_from_db.effective_amount) self.assertEqual(claim_from_es['effective_amount'], claim_from_db.effective_amount)
def assertMatchDBClaim(self, expected, claim): def assertMatchDBClaim(self, expected, claim):
self.assertEqual(expected['claimId'], claim.claim_hash.hex()) self.assertEqual(expected['claimid'], claim.claim_hash.hex())
self.assertEqual(expected['validAtHeight'], claim.activation_height) self.assertEqual(expected['validatheight'], claim.activation_height)
self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height) self.assertEqual(expected['lasttakeoverheight'], claim.last_takeover_height)
self.assertEqual(expected['txId'], claim.tx_hash[::-1].hex()) self.assertEqual(expected['txid'], claim.tx_hash[::-1].hex())
self.assertEqual(expected['n'], claim.position) self.assertEqual(expected['n'], claim.position)
self.assertEqual(expected['amount'], claim.amount) self.assertEqual(expected['amount'], claim.amount)
self.assertEqual(expected['effectiveAmount'], claim.effective_amount) self.assertEqual(expected['effectiveamount'], claim.effective_amount)
async def assertResolvesToClaimId(self, name, claim_id): async def assertResolvesToClaimId(self, name, claim_id):
other = await self.resolve(name) other = await self.resolve(name)
@ -53,9 +53,10 @@ class BaseResolveTestCase(CommandTestCase):
self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex()) self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex())
async def assertNoClaimForName(self, name: str): async def assertNoClaimForName(self, name: str):
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
self.assertNotIn('claimId', lbrycrd_winning) if 'claims' in lbrycrd_winning and lbrycrd_winning['claims'] is not None:
self.assertEqual(len(lbrycrd_winning['claims']), 0)
if stream is not None: if stream is not None:
self.assertIsInstance(stream, LookupError) self.assertIsInstance(stream, LookupError)
else: else:
@ -63,20 +64,23 @@ class BaseResolveTestCase(CommandTestCase):
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name) claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name)
self.assertListEqual([], claim_from_es[0]) self.assertListEqual([], claim_from_es[0])
async def assertNoClaim(self, claim_id: str): async def assertNoClaim(self, name: str, claim_id: str):
self.assertDictEqual( expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebyid', name, '["' + claim_id + '"]'))
{}, json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id)) if 'claims' in expected and expected['claims'] is not None:
) # ensure that if we do have the matching claim that it is not active
self.assertEqual(expected['claims'][0]['effectiveamount'], 0)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id) claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id)
self.assertListEqual([], claim_from_es[0]) self.assertListEqual([], claim_from_es[0])
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
self.assertIsNone(claim) self.assertIsNone(claim)
async def assertMatchWinningClaim(self, name): async def assertMatchWinningClaim(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebybid', name, "[0]"))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
claim = stream if stream else channel claim = stream if stream else channel
await self._assertMatchClaim(expected, claim) expected['claims'][0]['lasttakeoverheight'] = expected['lasttakeoverheight']
await self._assertMatchClaim(expected['claims'][0], claim)
return claim return claim
async def _assertMatchClaim(self, expected, claim): async def _assertMatchClaim(self, expected, claim):
@ -86,28 +90,31 @@ class BaseResolveTestCase(CommandTestCase):
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
self.assertMatchESClaim(claim_from_es[0][0], claim) self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(claim.claim_hash.hex(), expected['supports'], claim_from_es[0][0]['support_amount']) self._check_supports(claim.claim_hash.hex(), expected.get('supports', []),
claim_from_es[0][0]['support_amount'], expected['effectiveamount'] > 0)
async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True): async def assertMatchClaim(self, name, claim_id, is_active_in_lbrycrd=True):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
if is_active_in_lbrycrd:
if not expected:
self.assertIsNone(claim)
return
self.assertMatchDBClaim(expected, claim)
else:
self.assertDictEqual({}, expected)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_id=claim.claim_hash.hex() claim_id=claim.claim_hash.hex()
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex()) self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex())
self.assertMatchESClaim(claim_from_es[0][0], claim) self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(
claim.claim_hash.hex(), expected.get('supports', []), claim_from_es[0][0]['support_amount'], expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebyid', name, '["' + claim_id + '"]'))
is_active_in_lbrycrd if is_active_in_lbrycrd:
) if not expected:
self.assertIsNone(claim)
return
expected['claims'][0]['lasttakeoverheight'] = expected['lasttakeoverheight']
self.assertMatchDBClaim(expected['claims'][0], claim)
self._check_supports(claim.claim_hash.hex(), expected['claims'][0].get('supports', []),
claim_from_es[0][0]['support_amount'], is_active_in_lbrycrd)
else:
if 'claims' in expected and expected['claims'] is not None:
# ensure that if we do have the matching claim that it is not active
self.assertEqual(expected['claims'][0]['effectiveamount'], 0)
return claim return claim
async def assertMatchClaimIsWinning(self, name, claim_id): async def assertMatchClaimIsWinning(self, name, claim_id):
@ -122,34 +129,31 @@ class BaseResolveTestCase(CommandTestCase):
total_amount += amount total_amount += amount
if is_active_in_lbrycrd: if is_active_in_lbrycrd:
support = lbrycrd_supports[i] support = lbrycrd_supports[i]
self.assertEqual(support['txId'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex()) self.assertEqual(support['txid'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex())
self.assertEqual(support['n'], position) self.assertEqual(support['n'], position)
self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num)) self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num))
self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True)) self.assertEqual(support['validatheight'], db.get_activation(tx_num, position, is_support=True))
self.assertEqual(total_amount, es_support_amount, f"lbrycrd support amount: {total_amount} vs es: {es_support_amount}") self.assertEqual(total_amount, es_support_amount, f"lbrycrd support amount: {total_amount} vs es: {es_support_amount}")
async def assertMatchClaimsForName(self, name): async def assertMatchClaimsForName(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name)) expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name, "", "true"))
db = self.conductor.spv_node.server.bp.db db = self.conductor.spv_node.server.bp.db
# self.assertEqual(len(expected['claims']), len(db_claims.claims))
# self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight)
last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight']
for c in expected['claims']: for c in expected['claims']:
c['lastTakeoverHeight'] = last_takeover c['lasttakeoverheight'] = expected['lasttakeoverheight']
claim_id = c['claimId'] claim_id = c['claimid']
claim_hash = bytes.fromhex(claim_id) claim_hash = bytes.fromhex(claim_id)
claim = db._fs_get_claim_by_hash(claim_hash) claim = db._fs_get_claim_by_hash(claim_hash)
self.assertMatchDBClaim(c, claim) self.assertMatchDBClaim(c, claim)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_id=c['claimId'] claim_id=claim_id
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), c['claimId']) self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim_id)
self.assertMatchESClaim(claim_from_es[0][0], claim) self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount']) self._check_supports(claim_id, c.get('supports', []),
claim_from_es[0][0]['support_amount'], c['effectiveamount'] > 0)
class ResolveCommand(BaseResolveTestCase): class ResolveCommand(BaseResolveTestCase):
@ -262,18 +266,18 @@ class ResolveCommand(BaseResolveTestCase):
self.assertEqual(claim['confirmations'], json.loads(tx_details)['confirmations']) self.assertEqual(claim['confirmations'], json.loads(tx_details)['confirmations'])
# resolve handles invalid data # resolve handles invalid data
await self.blockchain_claim_name("gibberish", hexlify(b"{'invalid':'json'}").decode(), "0.1") # await self.blockchain_claim_name("gibberish", hexlify(b"{'invalid':'json'}").decode(), "0.1")
await self.generate(1) # await self.generate(1)
response = await self.out(self.daemon.jsonrpc_resolve("lbry://gibberish")) # response = await self.out(self.daemon.jsonrpc_resolve("lbry://gibberish"))
self.assertSetEqual({'lbry://gibberish'}, set(response)) # self.assertSetEqual({'lbry://gibberish'}, set(response))
claim = response['lbry://gibberish'] # claim = response['lbry://gibberish']
self.assertEqual(claim['name'], 'gibberish') # self.assertEqual(claim['name'], 'gibberish')
self.assertNotIn('value', claim) # self.assertNotIn('value', claim)
# resolve retries # resolve retries
await self.conductor.spv_node.stop() await self.conductor.spv_node.stop()
resolve_task = asyncio.create_task(self.resolve('foo')) resolve_task = asyncio.create_task(self.resolve('foo'))
await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
self.assertIsNotNone((await resolve_task)['claim_id']) self.assertIsNotNone((await resolve_task)['claim_id'])
async def test_winning_by_effective_amount(self): async def test_winning_by_effective_amount(self):
@ -646,7 +650,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height)
await self.assertMatchClaimIsWinning(name, winning_claim_id) await self.assertMatchClaimIsWinning(name, winning_claim_id)
for non_winning in non_winning_claims: for non_winning in non_winning_claims:
claim = await self.assertMatchClaim( claim = await self.assertMatchClaim(name,
non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd
) )
self.assertEqual(non_winning.activation_height, claim.activation_height) self.assertEqual(non_winning.activation_height, claim.activation_height)
@ -1334,7 +1338,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.generate(1) await self.generate(1)
await self.assertMatchClaim(first_claim_id) await self.assertMatchClaim(name, first_claim_id)
await self.assertMatchClaimIsWinning(name, second_claim_id) await self.assertMatchClaimIsWinning(name, second_claim_id)
async def test_remove_controlling_support(self): async def test_remove_controlling_support(self):
@ -1405,12 +1409,12 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.generate(32) await self.generate(32)
second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id'] second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id']
await self.assertNoClaim(second_claim_id) await self.assertNoClaim(name, second_claim_id)
self.assertEqual( self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1 len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1
) )
await self.generate(1) await self.generate(1)
await self.assertMatchClaim(second_claim_id) await self.assertMatchClaim(name, second_claim_id)
self.assertEqual( self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2 len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2
) )
@ -1570,7 +1574,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.conductor.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
@ -1649,7 +1653,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.conductor.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg

View file

@ -60,7 +60,7 @@ class TestSegwit(CommandTestCase):
tx = await self.blockchain.create_raw_transaction([ tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid1, "vout": 0}, {"txid": p2sh_txid1, "vout": 0},
{"txid": bech32_txid1, "vout": 0}, {"txid": bech32_txid1, "vout": 0},
], [{p2sh_address3: '1.9'}] ], {p2sh_address3: 1.9}
) )
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx) tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
p2sh_txid3 = await self.blockchain.send_raw_transaction(tx) p2sh_txid3 = await self.blockchain.send_raw_transaction(tx)
@ -71,7 +71,7 @@ class TestSegwit(CommandTestCase):
tx = await self.blockchain.create_raw_transaction([ tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid2, "vout": 0}, {"txid": p2sh_txid2, "vout": 0},
{"txid": bech32_txid2, "vout": 0}, {"txid": bech32_txid2, "vout": 0},
], [{bech32_address3: '1.9'}] ], {bech32_address3: 1.9}
) )
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx) tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
bech32_txid3 = await self.blockchain.send_raw_transaction(tx) bech32_txid3 = await self.blockchain.send_raw_transaction(tx)
@ -83,7 +83,7 @@ class TestSegwit(CommandTestCase):
tx = await self.blockchain.create_raw_transaction([ tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid3, "vout": 0}, {"txid": p2sh_txid3, "vout": 0},
{"txid": bech32_txid3, "vout": 0}, {"txid": bech32_txid3, "vout": 0},
], [{address: '3.5'}] ], {address: 3.5}
) )
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx) tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
txid = await self.blockchain.send_raw_transaction(tx) txid = await self.blockchain.send_raw_transaction(tx)