Compare commits

...
Sign in to create a new pull request.

28 commits

Author SHA1 Message Date
Jeffrey Picard
2c0e4e88ba asdf 2022-01-15 20:34:52 +00:00
Jeffrey Picard
07b81f6cb3 fix 2022-01-15 19:31:05 +00:00
Victor Shyba
04b3967115 bump lbcd to valid latest version 2022-01-15 09:29:21 +00:00
Brannon King
9c13030a15 get better error on RPC failure 2022-01-15 09:29:21 +00:00
Brannon King
cba664424f fixing stalls in tests 2022-01-15 09:29:19 +00:00
Brannon King
49db11eaa6 fix for the send-to-address timeout in asyncSetup 2022-01-15 09:27:40 +00:00
Roy Lee
e9466721c5 Update lbcd and lbcwallet versions 2022-01-15 09:27:40 +00:00
Brannon King
a2f2010647 fixed some tests; made them not timeout waiting for tx confirmation 2022-01-15 09:27:40 +00:00
Roy Lee
d1601a5c33 update lbcd to v0.22.100-beta-rc5 2022-01-15 09:27:40 +00:00
Victor Shyba
f59feb20e5 skip test_segwit while we take a look why lbcwallet rejects the tx 2022-01-15 09:27:40 +00:00
Victor Shyba
10d1ff7d3b ignore the config line warning 2022-01-15 09:27:40 +00:00
Victor Shyba
335d9c5c2b add comment for temporary lbcd edge case 2022-01-15 09:27:40 +00:00
Victor Shyba
35de3678b4 skip the test that uses claimname/updateclaim for now 2022-01-15 09:27:40 +00:00
Victor Shyba
b94a9e5dc4 separate stdout/stderr from lbcctl on tests 2022-01-15 09:27:40 +00:00
Roy Lee
fde715b259 update lbcd and lbcwallet veresion 2022-01-15 09:27:40 +00:00
Brannon King
ac6c1002f1 fixing a variety of broken tests, updated getclaimbyid usage 2022-01-15 09:27:40 +00:00
Roy Lee
982c6a13ca test: lbcd don't have a dedicated mempool.dat 2022-01-15 09:27:40 +00:00
Roy Lee
0e44faadf6 test: update lbcd to avoid txn being rejected due to munimum fee 2022-01-15 09:27:40 +00:00
Roy Lee
2ca0728807 HACK: temporary hack to move things along 2022-01-15 09:27:40 +00:00
Roy Lee
087ae9e09a add a rocksdb setup sanity check 2022-01-15 09:27:40 +00:00
Roy Lee
372eafdcb2 test: migrate from lbrycrd to lbcd/lbcwallet 2022-01-15 09:27:34 +00:00
Roy Lee
2381554f8f test: support generatetoaddress RPC 2022-01-15 09:23:28 +00:00
Roy Lee
75b055a2e9 test: support walletpassphrase RPC 2022-01-15 09:23:28 +00:00
Roy Lee
e3240b61dc test: getnewaddress RPC takes account name 2022-01-15 09:23:28 +00:00
Roy Lee
d200c847f0 test: update some RPC arguments to int type 2022-01-15 09:23:28 +00:00
Roy Lee
ffa1d00564 test: setup loggers first 2022-01-15 09:23:28 +00:00
Roy Lee
aafa1e9613 Update log level from WARN to INFO 2022-01-15 09:23:28 +00:00
Jack Robison
1992b83faf use rocksdb instead of leveldb
-dont use block processor directly from session manager
2022-01-15 09:23:28 +00:00
29 changed files with 811 additions and 387 deletions

View file

@ -0,0 +1,36 @@
import os
import plyvel
from lbry.wallet.server.db.db import RocksDBStore
def main(db_dir: str):
old_path = os.path.join(db_dir, 'lbry-leveldb')
new_path = os.path.join(db_dir, 'lbry-rocksdb')
old_db = plyvel.DB(
old_path, create_if_missing=True, max_open_files=256,
write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
)
new_db = RocksDBStore(new_path, 64, 256)
try:
batch = []
append_batch = batch.append
cnt = 0
for k, v in old_db.iterator():
append_batch((k, v))
cnt += 1
if cnt % 100_000 == 0:
with new_db.write_batch() as batch_write:
batch_put = batch_write.put
for item in batch:
batch_put(*item)
batch.clear()
print(f"flushed {cnt} key/value items")
finally:
old_db.close()
new_db.close()
if __name__ == "__main__":
main('/mnt/sdb/wallet_server/_data/')

View file

@ -15,7 +15,6 @@ RUN apt-get update && \
build-essential \ build-essential \
automake libtool \ automake libtool \
pkg-config \ pkg-config \
libleveldb-dev \
python3.7 \ python3.7 \
python3-dev \ python3-dev \
python3-pip \ python3-pip \

View file

@ -0,0 +1,61 @@
# FROM debian:10-slim
FROM python:3.7.12-slim-buster
ARG user=lbry
ARG db_dir=/database
ARG projects_dir=/home/$user
ARG DOCKER_TAG
ARG DOCKER_COMMIT=docker
ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT
RUN apt-get update && \
apt-get -y --no-install-recommends install \
wget \
tar unzip \
build-essential \
automake libtool \
pkg-config \
librocksdb-dev
# python3.7 \
# python3-dev \
# python3-pip \
# python3-wheel \
# python3-cffi \
# python3-setuptools && \
# update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \
# rm -rf /var/lib/apt/lists/*
RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user
RUN mkdir -p $db_dir
RUN chown -R $user:$user $db_dir
COPY . $projects_dir
RUN chown -R $user:$user $projects_dir
USER $user
ENV PATH=/home/lbry/.local/bin:$PATH
WORKDIR $projects_dir
RUN python -m pip install --upgrade pip
RUN pip install lbry-rocksdb
RUN pip install uvloop
RUN make install
RUN python3 docker/set_build.py
RUN rm ~/.cache -rf
# entry point
ARG host=0.0.0.0
ARG tcp_port=50001
ARG daemon_url=https://lbry:lbry@192.99.151.178:9245/
VOLUME $db_dir
ENV TCP_PORT=$tcp_port
ENV HOST=$host
ENV DAEMON_URL=$daemon_url
ENV DB_DIRECTORY=$db_dir
ENV MAX_SESSIONS=1000000000
ENV MAX_SEND=1000000000000000000
ENV EVENT_LOOP_POLICY=uvloop
COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

View file

@ -19,7 +19,7 @@ from lbry.conf import Config
from lbry.wallet.util import satoshis_to_coins from lbry.wallet.util import satoshis_to_coins
from lbry.wallet.dewies import lbc_to_dewies from lbry.wallet.dewies import lbc_to_dewies
from lbry.wallet.orchstr8 import Conductor from lbry.wallet.orchstr8 import Conductor
from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode from lbry.wallet.orchstr8.node import LBCWalletNode, WalletNode, HubNode
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
@ -230,7 +230,7 @@ class IntegrationTestCase(AsyncioTestCase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.conductor: Optional[Conductor] = None self.conductor: Optional[Conductor] = None
self.blockchain: Optional[BlockchainNode] = None self.blockchain: Optional[LBCWalletNode] = None
self.hub: Optional[HubNode] = None self.hub: Optional[HubNode] = None
self.wallet_node: Optional[WalletNode] = None self.wallet_node: Optional[WalletNode] = None
self.manager: Optional[WalletManager] = None self.manager: Optional[WalletManager] = None
@ -240,15 +240,17 @@ class IntegrationTestCase(AsyncioTestCase):
async def asyncSetUp(self): async def asyncSetUp(self):
self.conductor = Conductor(seed=self.SEED) self.conductor = Conductor(seed=self.SEED)
await self.conductor.start_blockchain() await self.conductor.start_lbcd()
self.addCleanup(self.conductor.stop_blockchain) self.addCleanup(self.conductor.stop_lbcd)
await self.conductor.start_lbcwallet()
self.addCleanup(self.conductor.stop_lbcwallet)
await self.conductor.start_spv() await self.conductor.start_spv()
self.addCleanup(self.conductor.stop_spv) self.addCleanup(self.conductor.stop_spv)
await self.conductor.start_wallet() await self.conductor.start_wallet()
self.addCleanup(self.conductor.stop_wallet) self.addCleanup(self.conductor.stop_wallet)
await self.conductor.start_hub() await self.conductor.start_hub()
self.addCleanup(self.conductor.stop_hub) self.addCleanup(self.conductor.stop_hub)
self.blockchain = self.conductor.blockchain_node self.blockchain = self.conductor.lbcwallet_node
self.hub = self.conductor.hub_node self.hub = self.conductor.hub_node
self.wallet_node = self.conductor.wallet_node self.wallet_node = self.conductor.wallet_node
self.manager = self.wallet_node.manager self.manager = self.wallet_node.manager
@ -263,6 +265,13 @@ class IntegrationTestCase(AsyncioTestCase):
def broadcast(self, tx): def broadcast(self, tx):
return self.ledger.broadcast(tx) return self.ledger.broadcast(tx)
async def broadcast_and_confirm(self, tx, ledger=None):
ledger = ledger or self.ledger
notifications = asyncio.create_task(ledger.wait(tx))
await ledger.broadcast(tx)
await notifications
await self.generate_and_wait(1, [tx.id], ledger)
async def on_header(self, height): async def on_header(self, height):
if self.ledger.headers.height < height: if self.ledger.headers.height < height:
await self.ledger.on_header.where( await self.ledger.on_header.where(
@ -270,11 +279,29 @@ class IntegrationTestCase(AsyncioTestCase):
) )
return True return True
def on_transaction_id(self, txid, ledger=None): async def send_to_address_and_wait(self, address, amount, blocks_to_generate=0, ledger=None):
return (ledger or self.ledger).on_transaction.where( tx_watch = []
lambda e: e.tx.id == txid txid = None
done = False
watcher = (ledger or self.ledger).on_transaction.where(
lambda e: e.tx.id == txid or done or tx_watch.append(e.tx.id)
) )
txid = await self.blockchain.send_to_address(address, amount)
done = txid in tx_watch
await watcher
await self.generate_and_wait(blocks_to_generate, [txid], ledger)
return txid
async def generate_and_wait(self, blocks_to_generate, txids, ledger=None):
if blocks_to_generate > 0:
watcher = (ledger or self.ledger).on_transaction.where(
lambda e: ((e.tx.id in txids and txids.remove(e.tx.id)), len(txids) <= 0)[-1] # multi-statement lambda
)
await self.blockchain.generate(blocks_to_generate)
await watcher
def on_address_update(self, address): def on_address_update(self, address):
return self.ledger.on_transaction.where( return self.ledger.on_transaction.where(
lambda e: e.address == address lambda e: e.address == address
@ -328,7 +355,7 @@ class ExchangeRateManagerComponent(Component):
class CommandTestCase(IntegrationTestCase): class CommandTestCase(IntegrationTestCase):
VERBOSITY = logging.WARN VERBOSITY = logging.INFO
blob_lru_cache_size = 0 blob_lru_cache_size = 0
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -345,20 +372,19 @@ class CommandTestCase(IntegrationTestCase):
self.skip_libtorrent = True self.skip_libtorrent = True
async def asyncSetUp(self): async def asyncSetUp(self):
await super().asyncSetUp()
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY) logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY) logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY) logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY) logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
await super().asyncSetUp()
self.daemon = await self.add_daemon(self.wallet_node) self.daemon = await self.add_daemon(self.wallet_node)
await self.account.ensure_address_gap() await self.account.ensure_address_gap()
address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0] address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]
sendtxid = await self.blockchain.send_to_address(address, 10) await self.send_to_address_and_wait(address, 10, 6)
await self.confirm_tx(sendtxid)
await self.generate(5)
server_tmp_dir = tempfile.mkdtemp() server_tmp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, server_tmp_dir) self.addCleanup(shutil.rmtree, server_tmp_dir)
@ -455,9 +481,8 @@ class CommandTestCase(IntegrationTestCase):
async def confirm_tx(self, txid, ledger=None): async def confirm_tx(self, txid, ledger=None):
""" Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """ """ Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """
await self.on_transaction_id(txid, ledger) # actually, if it's in the mempool or in the block we're fine
await self.generate(1) await self.generate_and_wait(1, [txid], ledger=ledger)
await self.on_transaction_id(txid, ledger)
return txid return txid
async def on_transaction_dict(self, tx): async def on_transaction_dict(self, tx):

View file

@ -1,8 +1,13 @@
__node_daemon__ = 'lbrycrdd' __lbcd__ = 'lbcd'
__node_cli__ = 'lbrycrd-cli' __lbcctl__ = 'lbcctl'
__node_bin__ = '' __lbcwallet__ = 'lbcwallet'
__node_url__ = ( __lbcd_url__ = (
'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.6/lbrycrd-linux-1746.zip' 'https://github.com/lbryio/lbcd/releases/download/' +
'v0.22.200-beta/lbcd_0.22.200-beta_TARGET_PLATFORM.tar.gz'
)
__lbcwallet_url__ = (
'https://github.com/lbryio/lbcwallet/releases/download/' +
'v0.13.100-alpha-rc2/lbcwallet_0.13.100-alpha-rc2_TARGET_PLATFORM.tar.gz'
) )
__spvserver__ = 'lbry.wallet.server.coin.LBCRegTest' __spvserver__ = 'lbry.wallet.server.coin.LBCRegTest'

View file

@ -365,6 +365,10 @@ class Ledger(metaclass=LedgerRegistry):
await self.db.close() await self.db.close()
await self.headers.close() await self.headers.close()
async def tasks_are_done(self):
await self._update_tasks.done.wait()
await self._other_tasks.done.wait()
@property @property
def local_height_including_downloaded_height(self): def local_height_including_downloaded_height(self):
return max(self.headers.height, self._download_height) return max(self.headers.height, self._download_height)

View file

@ -5,7 +5,9 @@ import aiohttp
from lbry import wallet from lbry import wallet
from lbry.wallet.orchstr8.node import ( from lbry.wallet.orchstr8.node import (
Conductor, get_blockchain_node_from_ledger Conductor,
get_lbcd_node_from_ledger,
get_lbcwallet_node_from_ledger
) )
from lbry.wallet.orchstr8.service import ConductorService from lbry.wallet.orchstr8.service import ConductorService
@ -16,10 +18,11 @@ def get_argument_parser():
) )
subparsers = parser.add_subparsers(dest='command', help='sub-command help') subparsers = parser.add_subparsers(dest='command', help='sub-command help')
subparsers.add_parser("download", help="Download blockchain node binary.") subparsers.add_parser("download", help="Download lbcd and lbcwallet node binaries.")
start = subparsers.add_parser("start", help="Start orchstr8 service.") start = subparsers.add_parser("start", help="Start orchstr8 service.")
start.add_argument("--blockchain", help="Hostname to start blockchain node.") start.add_argument("--lbcd", help="Hostname to start lbcd node.")
start.add_argument("--lbcwallet", help="Hostname to start lbcwallet node.")
start.add_argument("--spv", help="Hostname to start SPV server.") start.add_argument("--spv", help="Hostname to start SPV server.")
start.add_argument("--wallet", help="Hostname to start wallet daemon.") start.add_argument("--wallet", help="Hostname to start wallet daemon.")
@ -47,7 +50,8 @@ def main():
if command == 'download': if command == 'download':
logging.getLogger('blockchain').setLevel(logging.INFO) logging.getLogger('blockchain').setLevel(logging.INFO)
get_blockchain_node_from_ledger(wallet).ensure() get_lbcd_node_from_ledger(wallet).ensure()
get_lbcwallet_node_from_ledger(wallet).ensure()
elif command == 'generate': elif command == 'generate':
loop.run_until_complete(run_remote_command( loop.run_until_complete(run_remote_command(
@ -57,9 +61,12 @@ def main():
elif command == 'start': elif command == 'start':
conductor = Conductor() conductor = Conductor()
if getattr(args, 'blockchain', False): if getattr(args, 'lbcd', False):
conductor.blockchain_node.hostname = args.blockchain conductor.lbcd_node.hostname = args.lbcd
loop.run_until_complete(conductor.start_blockchain()) loop.run_until_complete(conductor.start_lbcd())
if getattr(args, 'lbcwallet', False):
conductor.lbcwallet_node.hostname = args.lbcwallet
loop.run_until_complete(conductor.start_lbcwallet())
if getattr(args, 'spv', False): if getattr(args, 'spv', False):
conductor.spv_node.hostname = args.spv conductor.spv_node.hostname = args.spv
loop.run_until_complete(conductor.start_spv()) loop.run_until_complete(conductor.start_spv())

View file

@ -8,6 +8,7 @@ import logging
import tempfile import tempfile
import subprocess import subprocess
import importlib import importlib
import platform
from distutils.util import strtobool from distutils.util import strtobool
from binascii import hexlify from binascii import hexlify
@ -31,11 +32,18 @@ def get_spvserver_from_ledger(ledger_module):
return getattr(spvserver_module, regtest_class_name) return getattr(spvserver_module, regtest_class_name)
def get_blockchain_node_from_ledger(ledger_module): def get_lbcd_node_from_ledger(ledger_module):
return BlockchainNode( return LBCDNode(
ledger_module.__node_url__, ledger_module.__lbcd_url__,
os.path.join(ledger_module.__node_bin__, ledger_module.__node_daemon__), ledger_module.__lbcd__,
os.path.join(ledger_module.__node_bin__, ledger_module.__node_cli__) ledger_module.__lbcctl__
)
def get_lbcwallet_node_from_ledger(ledger_module):
return LBCWalletNode(
ledger_module.__lbcwallet_url__,
ledger_module.__lbcwallet__,
ledger_module.__lbcctl__
) )
@ -45,51 +53,52 @@ class Conductor:
self.manager_module = WalletManager self.manager_module = WalletManager
self.spv_module = get_spvserver_from_ledger(lbry.wallet) self.spv_module = get_spvserver_from_ledger(lbry.wallet)
self.blockchain_node = get_blockchain_node_from_ledger(lbry.wallet) self.lbcd_node = get_lbcd_node_from_ledger(lbry.wallet)
self.lbcwallet_node = get_lbcwallet_node_from_ledger(lbry.wallet)
self.spv_node = SPVNode(self.spv_module) self.spv_node = SPVNode(self.spv_module)
self.wallet_node = WalletNode( self.wallet_node = WalletNode(
self.manager_module, RegTestLedger, default_seed=seed self.manager_module, RegTestLedger, default_seed=seed
) )
self.hub_node = HubNode(__hub_url__, "hub", self.spv_node) self.hub_node = HubNode(__hub_url__, "hub", self.spv_node)
self.blockchain_started = False self.lbcd_started = False
self.lbcwallet_started = False
self.spv_started = False self.spv_started = False
self.wallet_started = False self.wallet_started = False
self.hub_started = False self.hub_started = False
self.log = log.getChild('conductor') self.log = log.getChild('conductor')
async def start_blockchain(self): async def start_lbcd(self):
if not self.blockchain_started: if not self.lbcd_started:
asyncio.create_task(self.blockchain_node.start()) asyncio.create_task(self.lbcd_node.start())
await self.blockchain_node.running.wait() await self.lbcd_node.running.wait()
await self.blockchain_node.generate(200) self.lbcd_started = True
self.blockchain_started = True
async def stop_blockchain(self): async def stop_lbcd(self, cleanup=True):
if self.blockchain_started: if self.lbcd_started:
await self.blockchain_node.stop(cleanup=True) await self.lbcd_node.stop(cleanup)
self.blockchain_started = False self.lbcd_started = False
async def start_hub(self): async def start_hub(self):
if not self.hub_started: if not self.hub_started:
asyncio.create_task(self.hub_node.start()) asyncio.create_task(self.hub_node.start())
await self.blockchain_node.running.wait() await self.lbcwallet_node.running.wait()
self.hub_started = True self.hub_started = True
async def stop_hub(self): async def stop_hub(self, cleanup=True):
if self.hub_started: if self.hub_started:
await self.hub_node.stop(cleanup=True) await self.hub_node.stop(cleanup)
self.hub_started = False self.hub_started = False
async def start_spv(self): async def start_spv(self):
if not self.spv_started: if not self.spv_started:
await self.spv_node.start(self.blockchain_node) await self.spv_node.start(self.lbcwallet_node)
self.spv_started = True self.spv_started = True
async def stop_spv(self): async def stop_spv(self, cleanup=True):
if self.spv_started: if self.spv_started:
await self.spv_node.stop(cleanup=True) await self.spv_node.stop(cleanup)
self.spv_started = False self.spv_started = False
async def start_wallet(self): async def start_wallet(self):
@ -97,13 +106,31 @@ class Conductor:
await self.wallet_node.start(self.spv_node) await self.wallet_node.start(self.spv_node)
self.wallet_started = True self.wallet_started = True
async def stop_wallet(self): async def stop_wallet(self, cleanup=True):
if self.wallet_started: if self.wallet_started:
await self.wallet_node.stop(cleanup=True) await self.wallet_node.stop(cleanup)
self.wallet_started = False self.wallet_started = False
async def start_lbcwallet(self, clean=True):
if not self.lbcwallet_started:
asyncio.create_task(self.lbcwallet_node.start())
await self.lbcwallet_node.running.wait()
if clean:
mining_addr = await self.lbcwallet_node.get_new_address()
self.lbcwallet_node.mining_addr = mining_addr
await self.lbcwallet_node.generate(200)
# unlock the wallet for the next 1 hour
await self.lbcwallet_node.wallet_passphrase("password", 3600)
self.lbcwallet_started = True
async def stop_lbcwallet(self, cleanup=True):
if self.lbcwallet_started:
await self.lbcwallet_node.stop(cleanup)
self.lbcwallet_started = False
async def start(self): async def start(self):
await self.start_blockchain() await self.start_lbcd()
await self.start_lbcwallet()
await self.start_spv() await self.start_spv()
await self.start_wallet() await self.start_wallet()
@ -111,7 +138,8 @@ class Conductor:
all_the_stops = [ all_the_stops = [
self.stop_wallet, self.stop_wallet,
self.stop_spv, self.stop_spv,
self.stop_blockchain self.stop_lbcwallet,
self.stop_lbcd
] ]
for stop in all_the_stops: for stop in all_the_stops:
try: try:
@ -119,6 +147,11 @@ class Conductor:
except Exception as e: except Exception as e:
log.exception('Exception raised while stopping services:', exc_info=e) log.exception('Exception raised while stopping services:', exc_info=e)
async def clear_mempool(self):
await self.stop_lbcwallet(cleanup=False)
await self.stop_lbcd(cleanup=False)
await self.start_lbcd()
await self.start_lbcwallet(clean=False)
class WalletNode: class WalletNode:
@ -139,8 +172,9 @@ class WalletNode:
async def start(self, spv_node: 'SPVNode', seed=None, connect=True, config=None): async def start(self, spv_node: 'SPVNode', seed=None, connect=True, config=None):
wallets_dir = os.path.join(self.data_path, 'wallets') wallets_dir = os.path.join(self.data_path, 'wallets')
os.mkdir(wallets_dir)
wallet_file_name = os.path.join(wallets_dir, 'my_wallet.json') wallet_file_name = os.path.join(wallets_dir, 'my_wallet.json')
if not os.path.isdir(wallets_dir):
os.mkdir(wallets_dir)
with open(wallet_file_name, 'w') as wallet_file: with open(wallet_file_name, 'w') as wallet_file:
wallet_file.write('{"version": 1, "accounts": []}\n') wallet_file.write('{"version": 1, "accounts": []}\n')
self.manager = self.manager_class.from_config({ self.manager = self.manager_class.from_config({
@ -198,14 +232,14 @@ class SPVNode:
self.stopped = False self.stopped = False
self.index_name = uuid4().hex self.index_name = uuid4().hex
async def start(self, blockchain_node: 'BlockchainNode', extraconf=None): async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None):
self.data_path = tempfile.mkdtemp() self.data_path = tempfile.mkdtemp()
conf = { conf = {
'DESCRIPTION': '', 'DESCRIPTION': '',
'PAYMENT_ADDRESS': '', 'PAYMENT_ADDRESS': '',
'DAILY_FEE': '0', 'DAILY_FEE': '0',
'DB_DIRECTORY': self.data_path, 'DB_DIRECTORY': self.data_path,
'DAEMON_URL': blockchain_node.rpc_url, 'DAEMON_URL': lbcwallet_node.rpc_url,
'REORG_LIMIT': '100', 'REORG_LIMIT': '100',
'HOST': self.hostname, 'HOST': self.hostname,
'TCP_PORT': str(self.port), 'TCP_PORT': str(self.port),
@ -240,18 +274,19 @@ class SPVNode:
shutil.rmtree(self.data_path, ignore_errors=True) shutil.rmtree(self.data_path, ignore_errors=True)
class BlockchainProcess(asyncio.SubprocessProtocol): class LBCDProcess(asyncio.SubprocessProtocol):
IGNORE_OUTPUT = [ IGNORE_OUTPUT = [
b'keypool keep', b'keypool keep',
b'keypool reserve', b'keypool reserve',
b'keypool return', b'keypool return',
b'Block submitted',
] ]
def __init__(self): def __init__(self):
self.ready = asyncio.Event() self.ready = asyncio.Event()
self.stopped = asyncio.Event() self.stopped = asyncio.Event()
self.log = log.getChild('blockchain') self.log = log.getChild('lbcd')
def pipe_data_received(self, fd, data): def pipe_data_received(self, fd, data):
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT): if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
@ -262,7 +297,7 @@ class BlockchainProcess(asyncio.SubprocessProtocol):
if b'Error:' in data: if b'Error:' in data:
self.ready.set() self.ready.set()
raise SystemError(data.decode()) raise SystemError(data.decode())
if b'Done loading' in data: if b'RPCS: RPC server listening on' in data:
self.ready.set() self.ready.set()
def process_exited(self): def process_exited(self):
@ -270,10 +305,34 @@ class BlockchainProcess(asyncio.SubprocessProtocol):
self.ready.set() self.ready.set()
class BlockchainNode: class WalletProcess(asyncio.SubprocessProtocol):
P2SH_SEGWIT_ADDRESS = "p2sh-segwit" IGNORE_OUTPUT = [
BECH32_ADDRESS = "bech32" ]
def __init__(self):
self.ready = asyncio.Event()
self.stopped = asyncio.Event()
self.log = log.getChild('lbcwallet')
self.transport: Optional[asyncio.transports.SubprocessTransport] = None
def pipe_data_received(self, fd, data):
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
if b'Error:' in data:
self.log.error(data.decode())
else:
self.log.info(data.decode())
if b'Error:' in data:
self.ready.set()
raise SystemError(data.decode())
if b'WLLT: Finished rescan' in data:
self.ready.set()
def process_exited(self):
self.stopped.set()
self.ready.set()
class LBCDNode:
def __init__(self, url, daemon, cli): def __init__(self, url, daemon, cli):
self.latest_release_url = url self.latest_release_url = url
@ -281,28 +340,22 @@ class BlockchainNode:
self.bin_dir = os.path.join(self.project_dir, 'bin') self.bin_dir = os.path.join(self.project_dir, 'bin')
self.daemon_bin = os.path.join(self.bin_dir, daemon) self.daemon_bin = os.path.join(self.bin_dir, daemon)
self.cli_bin = os.path.join(self.bin_dir, cli) self.cli_bin = os.path.join(self.bin_dir, cli)
self.log = log.getChild('blockchain') self.log = log.getChild('lbcd')
self.data_path = None self.data_path = tempfile.mkdtemp()
self.protocol = None self.protocol = None
self.transport = None self.transport = None
self.block_expected = 0
self.hostname = 'localhost' self.hostname = 'localhost'
self.peerport = 9246 + 2 # avoid conflict with default peer port self.peerport = 29246
self.rpcport = 9245 + 2 # avoid conflict with default rpc port self.rpcport = 29245
self.rpcuser = 'rpcuser' self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword' self.rpcpassword = 'rpcpassword'
self.stopped = False self.stopped = True
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.running = asyncio.Event() self.running = asyncio.Event()
@property @property
def rpc_url(self): def rpc_url(self):
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/' return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
def is_expected_block(self, e: BlockHeightEvent):
return self.block_expected == e.height
@property @property
def exists(self): def exists(self):
return ( return (
@ -311,6 +364,12 @@ class BlockchainNode:
) )
def download(self): def download(self):
uname = platform.uname()
target_os = str.lower(uname.system)
target_arch = str.replace(uname.machine, 'x86_64', 'amd64')
target_platform = target_os + '_' + target_arch
self.latest_release_url = str.replace(self.latest_release_url, 'TARGET_PLATFORM', target_platform)
downloaded_file = os.path.join( downloaded_file = os.path.join(
self.bin_dir, self.bin_dir,
self.latest_release_url[self.latest_release_url.rfind('/')+1:] self.latest_release_url[self.latest_release_url.rfind('/')+1:]
@ -345,34 +404,34 @@ class BlockchainNode:
async def start(self): async def start(self):
assert self.ensure() assert self.ensure()
self.data_path = tempfile.mkdtemp()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop) asyncio.get_child_watcher().attach_loop(loop)
command = [ command = [
self.daemon_bin, self.daemon_bin,
f'-datadir={self.data_path}', '-printtoconsole', '-regtest', '-server', '-txindex', '--notls',
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}', f'--datadir={self.data_path}',
f'-port={self.peerport}' '--regtest', f'--listen=127.0.0.1:{self.peerport}', f'--rpclisten=127.0.0.1:{self.rpcport}',
'--txindex', f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}'
] ]
self.log.info(' '.join(command)) self.log.info(' '.join(command))
while not self.stopped: while self.stopped:
if self.running.is_set(): if self.running.is_set():
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
await self.restart_ready.wait()
try: try:
self.transport, self.protocol = await loop.subprocess_exec( self.transport, self.protocol = await loop.subprocess_exec(
BlockchainProcess, *command LBCDProcess, *command
) )
await self.protocol.ready.wait() await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set() assert not self.protocol.stopped.is_set()
self.running.set() self.running.set()
self.stopped = False
except asyncio.CancelledError: except asyncio.CancelledError:
self.running.clear() self.running.clear()
raise raise
except Exception as e: except Exception as e:
self.running.clear() self.running.clear()
log.exception('failed to start lbrycrdd', exc_info=e) log.exception('failed to start lbcd', exc_info=e)
async def stop(self, cleanup=True): async def stop(self, cleanup=True):
self.stopped = True self.stopped = True
@ -381,35 +440,161 @@ class BlockchainNode:
await self.protocol.stopped.wait() await self.protocol.stopped.wait()
self.transport.close() self.transport.close()
finally: finally:
self.log.info("Done shutting down " + self.daemon_bin)
if cleanup: if cleanup:
self.cleanup() self.cleanup()
self.running.clear()
async def clear_mempool(self): def cleanup(self):
self.restart_ready.clear() assert self.stopped
shutil.rmtree(self.data_path, ignore_errors=True)
class LBCWalletNode:
P2SH_SEGWIT_ADDRESS = "p2sh-segwit"
BECH32_ADDRESS = "bech32"
def __init__(self, url, lbcwallet, cli):
self.latest_release_url = url
self.project_dir = os.path.dirname(os.path.dirname(__file__))
self.bin_dir = os.path.join(self.project_dir, 'bin')
self.lbcwallet_bin = os.path.join(self.bin_dir, lbcwallet)
self.cli_bin = os.path.join(self.bin_dir, cli)
self.log = log.getChild('lbcwallet')
self.protocol = None
self.transport = None
self.hostname = 'localhost'
self.lbcd_rpcport = 29245
self.lbcwallet_rpcport = 29244
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
self.data_path = tempfile.mkdtemp()
self.stopped = True
self.running = asyncio.Event()
self.block_expected = 0
self.mining_addr = ''
@property
def rpc_url(self):
# FIXME: somehow the hub/sdk doesn't learn the blocks through the Walet RPC port, why?
# return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.lbcwallet_rpcport}/'
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.lbcd_rpcport}/'
def is_expected_block(self, e: BlockHeightEvent):
return self.block_expected == e.height
@property
def exists(self):
return (
os.path.exists(self.lbcwallet_bin)
)
def download(self):
uname = platform.uname()
target_os = str.lower(uname.system)
target_arch = str.replace(uname.machine, 'x86_64', 'amd64')
target_platform = target_os + '_' + target_arch
self.latest_release_url = str.replace(self.latest_release_url, 'TARGET_PLATFORM', target_platform)
downloaded_file = os.path.join(
self.bin_dir,
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
)
if not os.path.exists(self.bin_dir):
os.mkdir(self.bin_dir)
if not os.path.exists(downloaded_file):
self.log.info('Downloading: %s', self.latest_release_url)
with urllib.request.urlopen(self.latest_release_url) as response:
with open(downloaded_file, 'wb') as out_file:
shutil.copyfileobj(response, out_file)
self.log.info('Extracting: %s', downloaded_file)
if downloaded_file.endswith('.zip'):
with zipfile.ZipFile(downloaded_file) as dotzip:
dotzip.extractall(self.bin_dir)
# zipfile bug https://bugs.python.org/issue15795
os.chmod(self.lbcwallet_bin, 0o755)
elif downloaded_file.endswith('.tar.gz'):
with tarfile.open(downloaded_file) as tar:
tar.extractall(self.bin_dir)
return self.exists
def ensure(self):
return self.exists or self.download()
async def start(self):
assert self.ensure()
loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop)
command = [
self.lbcwallet_bin,
'--noservertls', '--noclienttls',
'--regtest',
f'--rpcconnect=127.0.0.1:{self.lbcd_rpcport}', f'--rpclisten=127.0.0.1:{self.lbcwallet_rpcport}',
'--createtemp', f'--appdata={self.data_path}',
f'--username={self.rpcuser}', f'--password={self.rpcpassword}'
]
self.log.info(' '.join(command))
while self.stopped:
if self.running.is_set():
await asyncio.sleep(1)
continue
try:
self.transport, self.protocol = await loop.subprocess_exec(
WalletProcess, *command
)
self.protocol.transport = self.transport
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.running.set()
self.stopped = False
except asyncio.CancelledError:
self.running.clear()
raise
except Exception as e:
self.running.clear()
log.exception('failed to start lbcwallet', exc_info=e)
def cleanup(self):
assert self.stopped
shutil.rmtree(self.data_path, ignore_errors=True)
async def stop(self, cleanup=True):
self.stopped = True
try:
self.transport.terminate() self.transport.terminate()
await self.protocol.stopped.wait() await self.protocol.stopped.wait()
self.transport.close() self.transport.close()
finally:
self.log.info("Done shutting down " + self.lbcwallet_bin)
if cleanup:
self.cleanup()
self.running.clear() self.running.clear()
os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
self.restart_ready.set()
await self.running.wait()
def cleanup(self):
shutil.rmtree(self.data_path, ignore_errors=True)
async def _cli_cmnd(self, *args): async def _cli_cmnd(self, *args):
cmnd_args = [ cmnd_args = [
self.cli_bin, f'-datadir={self.data_path}', '-regtest', self.cli_bin,
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}' f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}', '--notls', '--regtest', '--wallet'
] + list(args) ] + list(args)
self.log.info(' '.join(cmnd_args)) self.log.info(' '.join(cmnd_args))
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop) asyncio.get_child_watcher().attach_loop(loop)
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
*cmnd_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT *cmnd_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
) )
out, _ = await process.communicate() out, err = await process.communicate()
result = out.decode().strip() result = out.decode().strip()
err = err.decode().strip()
if len(result) <= 0 and err.startswith('-'):
raise Exception(err)
if err and 'creating a default config file' not in err:
log.warning(err)
self.log.info(result) self.log.info(result)
if result.startswith('error code'): if result.startswith('error code'):
raise Exception(result) raise Exception(result)
@ -417,7 +602,14 @@ class BlockchainNode:
def generate(self, blocks): def generate(self, blocks):
self.block_expected += blocks self.block_expected += blocks
return self._cli_cmnd('generate', str(blocks)) return self._cli_cmnd('generatetoaddress', str(blocks), self.mining_addr)
def generate_to_address(self, blocks, addr):
self.block_expected += blocks
return self._cli_cmnd('generatetoaddress', str(blocks), addr)
def wallet_passphrase(self, passphrase, timeout):
return self._cli_cmnd('walletpassphrase', passphrase, str(timeout))
def invalidate_block(self, blockhash): def invalidate_block(self, blockhash):
return self._cli_cmnd('invalidateblock', blockhash) return self._cli_cmnd('invalidateblock', blockhash)
@ -434,7 +626,7 @@ class BlockchainNode:
def get_raw_change_address(self): def get_raw_change_address(self):
return self._cli_cmnd('getrawchangeaddress') return self._cli_cmnd('getrawchangeaddress')
def get_new_address(self, address_type): def get_new_address(self, address_type='legacy'):
return self._cli_cmnd('getnewaddress', "", address_type) return self._cli_cmnd('getnewaddress', "", address_type)
async def get_balance(self): async def get_balance(self):
@ -450,7 +642,10 @@ class BlockchainNode:
return self._cli_cmnd('createrawtransaction', json.dumps(inputs), json.dumps(outputs)) return self._cli_cmnd('createrawtransaction', json.dumps(inputs), json.dumps(outputs))
async def sign_raw_transaction_with_wallet(self, tx): async def sign_raw_transaction_with_wallet(self, tx):
return json.loads(await self._cli_cmnd('signrawtransactionwithwallet', tx))['hex'].encode() # the "withwallet" portion should only come into play if we are doing segwit.
# and "withwallet" doesn't exist on lbcd yet.
result = await self._cli_cmnd('signrawtransaction', tx)
return json.loads(result)['hex'].encode()
def decode_raw_transaction(self, tx): def decode_raw_transaction(self, tx):
return self._cli_cmnd('decoderawtransaction', hexlify(tx.raw).decode()) return self._cli_cmnd('decoderawtransaction', hexlify(tx.raw).decode())
@ -500,8 +695,6 @@ class HubNode:
self.hostname = 'localhost' self.hostname = 'localhost'
self.rpcport = 50051 # avoid conflict with default rpc port self.rpcport = 50051 # avoid conflict with default rpc port
self.stopped = False self.stopped = False
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.running = asyncio.Event() self.running = asyncio.Event()
@property @property
@ -558,7 +751,6 @@ class HubNode:
if self.running.is_set(): if self.running.is_set():
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
await self.restart_ready.wait()
try: try:
if not self.debug: if not self.debug:
self.transport, self.protocol = await loop.subprocess_exec( self.transport, self.protocol = await loop.subprocess_exec(

View file

@ -61,8 +61,10 @@ class ConductorService:
#set_logging( #set_logging(
# self.stack.ledger_module, logging.DEBUG, WebSocketLogHandler(self.send_message) # self.stack.ledger_module, logging.DEBUG, WebSocketLogHandler(self.send_message)
#) #)
self.stack.blockchain_started or await self.stack.start_blockchain() self.stack.lbcd_started or await self.stack.start_lbcd()
self.send_message({'type': 'service', 'name': 'blockchain', 'port': self.stack.blockchain_node.port}) self.send_message({'type': 'service', 'name': 'lbcd', 'port': self.stack.lbcd_node.port})
self.stack.lbcwallet_started or await self.stack.start_lbcwallet()
self.send_message({'type': 'service', 'name': 'lbcwallet', 'port': self.stack.lbcwallet_node.port})
self.stack.spv_started or await self.stack.start_spv() self.stack.spv_started or await self.stack.start_spv()
self.send_message({'type': 'service', 'name': 'spv', 'port': self.stack.spv_node.port}) self.send_message({'type': 'service', 'name': 'spv', 'port': self.stack.spv_node.port})
self.stack.wallet_started or await self.stack.start_wallet() self.stack.wallet_started or await self.stack.start_wallet()
@ -74,7 +76,7 @@ class ConductorService:
async def generate(self, request): async def generate(self, request):
data = await request.post() data = await request.post()
blocks = data.get('blocks', 1) blocks = data.get('blocks', 1)
await self.stack.blockchain_node.generate(int(blocks)) await self.stack.lbcwallet_node.generate(int(blocks))
return json_response({'blocks': blocks}) return json_response({'blocks': blocks})
async def transfer(self, request): async def transfer(self, request):
@ -85,11 +87,14 @@ class ConductorService:
if not address: if not address:
raise ValueError("No address was provided.") raise ValueError("No address was provided.")
amount = data.get('amount', 1) amount = data.get('amount', 1)
txid = await self.stack.blockchain_node.send_to_address(address, amount)
if self.stack.wallet_started: if self.stack.wallet_started:
await self.stack.wallet_node.ledger.on_transaction.where( watcher = self.stack.wallet_node.ledger.on_transaction.where(
lambda e: e.tx.id == txid and e.address == address lambda e: e.address == address # and e.tx.id == txid -- might stall; see send_to_address_and_wait
) )
txid = await self.stack.lbcwallet_node.send_to_address(address, amount)
await watcher
else:
txid = await self.stack.lbcwallet_node.send_to_address(address, amount)
return json_response({ return json_response({
'address': address, 'address': address,
'amount': amount, 'amount': amount,
@ -98,7 +103,7 @@ class ConductorService:
async def balance(self, _): async def balance(self, _):
return json_response({ return json_response({
'balance': await self.stack.blockchain_node.get_balance() 'balance': await self.stack.lbcwallet_node.get_balance()
}) })
async def log(self, request): async def log(self, request):
@ -129,7 +134,7 @@ class ConductorService:
'type': 'status', 'type': 'status',
'height': self.stack.wallet_node.ledger.headers.height, 'height': self.stack.wallet_node.ledger.headers.height,
'balance': satoshis_to_coins(await self.stack.wallet_node.account.get_balance()), 'balance': satoshis_to_coins(await self.stack.wallet_node.account.get_balance()),
'miner': await self.stack.blockchain_node.get_balance() 'miner': await self.stack.lbcwallet_node.get_balance()
}) })
def send_message(self, msg): def send_message(self, msg):

View file

@ -250,14 +250,14 @@ class Daemon:
async def deserialised_block(self, hex_hash): async def deserialised_block(self, hex_hash):
"""Return the deserialised block with the given hex hash.""" """Return the deserialised block with the given hex hash."""
if hex_hash not in self._block_cache: if hex_hash not in self._block_cache:
block = await self._send_single('getblock', (hex_hash, True)) block = await self._send_single('getblock', (hex_hash, 1))
self._block_cache[hex_hash] = block self._block_cache[hex_hash] = block
return block return block
return self._block_cache[hex_hash] return self._block_cache[hex_hash]
async def raw_blocks(self, hex_hashes): async def raw_blocks(self, hex_hashes):
"""Return the raw binary blocks with the given hex hashes.""" """Return the raw binary blocks with the given hex hashes."""
params_iterable = ((h, False) for h in hex_hashes) params_iterable = ((h, 0) for h in hex_hashes)
blocks = await self._send_vector('getblock', params_iterable) blocks = await self._send_vector('getblock', params_iterable)
# Convert hex string to bytes # Convert hex string to bytes
return [hex_to_bytes(block) for block in blocks] return [hex_to_bytes(block) for block in blocks]
@ -334,42 +334,7 @@ class LBCDaemon(Daemon):
async def getrawtransaction(self, hex_hash, verbose=False): async def getrawtransaction(self, hex_hash, verbose=False):
return await super().getrawtransaction(hex_hash=hex_hash, verbose=verbose) return await super().getrawtransaction(hex_hash=hex_hash, verbose=verbose)
@handles_errors
async def getclaimbyid(self, claim_id):
'''Given a claim id, retrieves claim information.'''
return await self._send_single('getclaimbyid', (claim_id,))
@handles_errors
async def getclaimsbyids(self, claim_ids):
'''Given a list of claim ids, batches calls to retrieve claim information.'''
return await self._send_vector('getclaimbyid', ((claim_id,) for claim_id in claim_ids))
@handles_errors @handles_errors
async def getclaimsforname(self, name): async def getclaimsforname(self, name):
'''Given a name, retrieves all claims matching that name.''' '''Given a name, retrieves all claims matching that name.'''
return await self._send_single('getclaimsforname', (name,)) return await self._send_single('getclaimsforname', (name,))
@handles_errors
async def getclaimsfortx(self, txid):
'''Given a txid, returns the claims it make.'''
return await self._send_single('getclaimsfortx', (txid,)) or []
@handles_errors
async def getnameproof(self, name, block_hash=None):
'''Given a name and optional block_hash, returns a name proof and winner, if any.'''
return await self._send_single('getnameproof', (name, block_hash,) if block_hash else (name,))
@handles_errors
async def getvalueforname(self, name):
'''Given a name, returns the winning claim value.'''
return await self._send_single('getvalueforname', (name,))
@handles_errors
async def getnamesintrie(self):
'''Given a name, returns the winning claim value.'''
return await self._send_single('getnamesintrie')
@handles_errors
async def claimname(self, name, hexvalue, amount):
'''Claim a name, used for functional tests only.'''
return await self._send_single('claimname', (name, hexvalue, float(amount)))

View file

@ -1,32 +1,130 @@
import struct import struct
import rocksdb
from typing import Optional from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
class KeyValueStorage: class RocksDBStore:
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
# Use snappy compression (the default)
self.path = path
self._max_open_files = max_open_files
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
# self.multi_get = self.db.multi_get
def get_options(self):
return rocksdb.Options(
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
max_open_files=self._max_open_files
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
raise NotImplemented() return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True): include_key=True, include_value=True, fill_cache=True):
raise NotImplemented() return RocksDBIterator(
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value
)
def write_batch(self, transaction: bool = False): def write_batch(self, disable_wal: bool = False, sync: bool = False):
raise NotImplemented() return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
def close(self): def close(self):
raise NotImplemented() self.db.close()
self.db = None
@property @property
def closed(self) -> bool: def closed(self) -> bool:
raise NotImplemented() return self.db is None
def try_catch_up_with_primary(self):
self.db.try_catch_up_with_primary()
class RocksDBWriteBatch:
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
self.batch = rocksdb.WriteBatch()
self.db = db
self.sync = sync
self.disable_wal = disable_wal
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
class RocksDBIterator:
"""An iterator for RocksDB."""
__slots__ = [
'start',
'prefix',
'stop',
'iterator',
'include_key',
'include_value',
'prev_k',
'reverse',
'include_start',
'include_stop'
]
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None,
include_key: bool = True, include_value: bool = True, reverse: bool = False,
include_start: bool = True, include_stop: bool = False):
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
self.start = start
self.prefix = prefix
self.stop = stop
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
if prefix is not None:
self.iterator.seek(prefix)
elif start is not None:
self.iterator.seek(start)
self.include_key = include_key
self.include_value = include_value
self.prev_k = None
self.reverse = reverse
self.include_start = include_start
self.include_stop = include_stop
def __iter__(self):
return self
def _check_stop_iteration(self, key: bytes):
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
raise StopIteration
elif self.start is not None and self.start > key[:len(self.start)]:
raise StopIteration
elif self.prefix is not None and not key.startswith(self.prefix):
raise StopIteration
def __next__(self):
# TODO: include start/stop on/off
# check for needing to stop from previous iteration
if self.prev_k is not None:
self._check_stop_iteration(self.prev_k)
k, v = next(self.iterator)
self._check_stop_iteration(k)
self.prev_k = k
if self.include_key and self.include_value:
return k, v
elif self.include_key:
return k
return v
class PrefixDB: class PrefixDB:
UNDO_KEY_STRUCT = struct.Struct(b'>Q') UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None): def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
self._db = db self._db = db
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes) self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
self._max_undo_depth = max_undo_depth self._max_undo_depth = max_undo_depth
@ -37,7 +135,7 @@ class PrefixDB:
Changes written cannot be undone Changes written cannot be undone
""" """
try: try:
with self._db.write_batch(transaction=True) as batch: with self._db.write_batch(sync=True) as batch:
batch_put = batch.put batch_put = batch.put
batch_delete = batch.delete batch_delete = batch.delete
for staged_change in self._op_stack: for staged_change in self._op_stack:
@ -61,7 +159,7 @@ class PrefixDB:
include_value=False include_value=False
)) ))
try: try:
with self._db.write_batch(transaction=True) as batch: with self._db.write_batch(sync=True) as batch:
batch_put = batch.put batch_put = batch.put
batch_delete = batch.delete batch_delete = batch.delete
for staged_change in self._op_stack: for staged_change in self._op_stack:
@ -82,7 +180,7 @@ class PrefixDB:
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height) undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key)) self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
try: try:
with self._db.write_batch(transaction=True) as batch: with self._db.write_batch(sync=True) as batch:
batch_put = batch.put batch_put = batch.put
batch_delete = batch.delete batch_delete = batch.delete
for staged_change in self._op_stack: for staged_change in self._op_stack:
@ -108,6 +206,9 @@ class PrefixDB:
if not self._db.closed: if not self._db.closed:
self._db.close() self._db.close()
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
@property @property
def closed(self): def closed(self):
return self._db.closed return self._db.closed

View file

@ -4,7 +4,7 @@ import array
import base64 import base64
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name from lbry.schema.url import normalize_name
@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType):
value_struct: struct.Struct value_struct: struct.Struct
key_part_lambdas = [] key_part_lambdas = []
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack): def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack):
self._db = db self._db = db
self._op_stack = op_stack self._op_stack = op_stack
@ -1595,40 +1595,10 @@ class BlockTxsPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(tx_hashes) return cls.pack_key(height), cls.pack_value(tx_hashes)
class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel
self.db = plyvel.DB(
path, create_if_missing=True, max_open_files=max_open_files,
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return self.db.iterator(
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
)
def write_batch(self, transaction: bool = False, sync: bool = False):
return self.db.write_batch(transaction=transaction, sync=sync)
def close(self):
return self.db.close()
@property
def closed(self) -> bool:
return self.db.closed
class HubDB(PrefixDB): class HubDB(PrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
db = LevelDBStore(path, cache_mb, max_open_files) db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path)
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes) super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack) self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack) self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)

View file

@ -26,7 +26,10 @@ class Server:
self.prometheus_server: typing.Optional[PrometheusServer] = None self.prometheus_server: typing.Optional[PrometheusServer] = None
self.session_mgr = LBRYSessionManager( self.session_mgr = LBRYSessionManager(
env, db, bp, daemon, self.shutdown_event env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
self.shutdown_event,
on_available_callback=bp.status_server.set_available,
on_unavailable_callback=bp.status_server.set_unavailable
) )
self._indexer_task = None self._indexer_task = None

View file

@ -170,13 +170,16 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event): def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
daemon: 'Daemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.env = env self.env = env
self.db = db self.db = db
self.bp = bp self.on_available_callback = on_available_callback
self.on_unavailable_callback = on_unavailable_callback
self.daemon = daemon self.daemon = daemon
self.mempool = bp.mempool self.mempool = mempool
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {} self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
@ -186,7 +189,9 @@ class SessionManager:
self.cur_group = SessionGroup(0) self.cur_group = SessionGroup(0)
self.txs_sent = 0 self.txs_sent = 0
self.start_time = time.time() self.start_time = time.time()
self.history_cache = self.bp.history_cache self.history_cache = history_cache
self.resolve_cache = resolve_cache
self.resolve_outputs_cache = resolve_outputs_cache
self.notified_height: typing.Optional[int] = None self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
@ -243,7 +248,7 @@ class SessionManager:
await self.session_event.wait() await self.session_event.wait()
self.session_event.clear() self.session_event.clear()
if not paused and len(self.sessions) >= max_sessions: if not paused and len(self.sessions) >= max_sessions:
self.bp.status_server.set_unavailable() self.on_unavailable_callback()
self.logger.info(f'maximum sessions {max_sessions:,d} ' self.logger.info(f'maximum sessions {max_sessions:,d} '
f'reached, stopping new connections until ' f'reached, stopping new connections until '
f'count drops to {low_watermark:,d}') f'count drops to {low_watermark:,d}')
@ -252,7 +257,7 @@ class SessionManager:
# Start listening for incoming connections if paused and # Start listening for incoming connections if paused and
# session count has fallen # session count has fallen
if paused and len(self.sessions) <= low_watermark: if paused and len(self.sessions) <= low_watermark:
self.bp.status_server.set_available() self.on_available_callback()
self.logger.info('resuming listening for incoming connections') self.logger.info('resuming listening for incoming connections')
await self._start_external_servers() await self._start_external_servers()
paused = False paused = False
@ -533,7 +538,7 @@ class SessionManager:
await self.start_other() await self.start_other()
await self._start_external_servers() await self._start_external_servers()
server_listening_event.set() server_listening_event.set()
self.bp.status_server.set_available() self.on_available_callback()
# Peer discovery should start after the external servers # Peer discovery should start after the external servers
# because we connect to ourself # because we connect to ourself
await asyncio.wait([ await asyncio.wait([
@ -628,8 +633,9 @@ class SessionManager:
for hashX in touched.intersection(self.mempool_statuses.keys()): for hashX in touched.intersection(self.mempool_statuses.keys()):
self.mempool_statuses.pop(hashX, None) self.mempool_statuses.pop(hashX, None)
# self.bp._chain_executor
await asyncio.get_event_loop().run_in_executor( await asyncio.get_event_loop().run_in_executor(
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys() None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
) )
if touched or new_touched or (height_changed and self.mempool_statuses): if touched or new_touched or (height_changed and self.mempool_statuses):
@ -866,8 +872,7 @@ class LBRYElectrumX(SessionBase):
self.protocol_tuple = self.PROTOCOL_MIN self.protocol_tuple = self.PROTOCOL_MIN
self.protocol_string = None self.protocol_string = None
self.daemon = self.session_mgr.daemon self.daemon = self.session_mgr.daemon
self.bp: BlockProcessor = self.session_mgr.bp self.db: LevelDB = self.session_mgr.db
self.db: LevelDB = self.bp.db
@classmethod @classmethod
def protocol_min_max_strings(cls): def protocol_min_max_strings(cls):
@ -1008,21 +1013,21 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def _cached_resolve_url(self, url): async def _cached_resolve_url(self, url):
if url not in self.bp.resolve_cache: if url not in self.session_mgr.resolve_cache:
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.bp.resolve_cache[url] return self.session_mgr.resolve_cache[url]
async def claimtrie_resolve(self, *urls) -> str: async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls)) sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
try: try:
if sorted_urls in self.bp.resolve_outputs_cache: if sorted_urls in self.session_mgr.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls] return self.session_mgr.resolve_outputs_cache[sorted_urls]
rows, extra = [], [] rows, extra = [], []
for url in urls: for url in urls:
if url not in self.bp.resolve_cache: if url not in self.session_mgr.resolve_cache:
self.bp.resolve_cache[url] = await self._cached_resolve_url(url) self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url] stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
if isinstance(channel, ResolveCensoredError): if isinstance(channel, ResolveCensoredError):
rows.append(channel) rows.append(channel)
extra.append(channel.censor_row) extra.append(channel.censor_row)
@ -1047,7 +1052,7 @@ class LBRYElectrumX(SessionBase):
if reposted_channel: if reposted_channel:
extra.append(reposted_channel) extra.append(reposted_channel)
await asyncio.sleep(0) await asyncio.sleep(0)
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None None, Outputs.to_base64, rows, extra, 0, None, None
) )
return result return result
@ -1055,7 +1060,7 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.db.db_height
async def transaction_get_height(self, tx_hash): async def transaction_get_height(self, tx_hash):
self.assert_tx_hash(tx_hash) self.assert_tx_hash(tx_hash)
@ -1467,7 +1472,7 @@ class LBRYElectrumX(SessionBase):
if mempool_tx: if mempool_tx:
raw_tx, block_hash = mempool_tx.raw_tx.hex(), None raw_tx, block_hash = mempool_tx.raw_tx.hex(), None
else: else:
tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) tx_info = await self.daemon_request('getrawtransaction', tx_hash, 1)
raw_tx = tx_info['hex'] raw_tx = tx_info['hex']
block_hash = tx_info.get('blockhash') block_hash = tx_info.get('blockhash')
if block_hash: if block_hash:
@ -1504,7 +1509,7 @@ class LBRYElectrumX(SessionBase):
if verbose not in (True, False): if verbose not in (True, False):
raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean') raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean')
return await self.daemon_request('getrawtransaction', tx_hash, verbose) return await self.daemon_request('getrawtransaction', tx_hash, int(verbose))
def _get_merkle_branch(self, tx_hashes, tx_pos): def _get_merkle_branch(self, tx_hashes, tx_pos):
"""Return a merkle branch to a transaction. """Return a merkle branch to a transaction.

View file

@ -40,22 +40,17 @@ def checkrecord(record, expected_winner, expected_claim):
async def checkcontrolling(daemon: Daemon, db: SQLDB): async def checkcontrolling(daemon: Daemon, db: SQLDB):
records, claim_ids, names, futs = [], [], [], [] records, names, futs = [], [], []
for record in db.get_claims('claimtrie.claim_hash as is_controlling, claim.*', is_controlling=True): for record in db.get_claims('claimtrie.claim_hash as is_controlling, claim.*', is_controlling=True):
records.append(record) records.append(record)
claim_id = hex_reverted(record['claim_hash']) claim_id = hex_reverted(record['claim_hash'])
claim_ids.append((claim_id,)) names.append((record['normalized'], (claim_id,), "", True)) # last parameter is IncludeValues
names.append((record['normalized'],))
if len(names) > 50000: if len(names) > 50000:
futs.append(daemon._send_vector('getvalueforname', names[:])) futs.append(daemon._send_vector('getclaimsfornamebyid', names))
futs.append(daemon._send_vector('getclaimbyid', claim_ids[:]))
names.clear() names.clear()
claim_ids.clear()
if names: if names:
futs.append(daemon._send_vector('getvalueforname', names[:])) futs.append(daemon._send_vector('getclaimsfornamebyid', names))
futs.append(daemon._send_vector('getclaimbyid', claim_ids[:]))
names.clear() names.clear()
claim_ids.clear()
while futs: while futs:
winners, claims = futs.pop(0), futs.pop(0) winners, claims = futs.pop(0), futs.pop(0)

View file

@ -7,9 +7,11 @@ BASE = os.path.dirname(__file__)
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh: with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
long_description = fh.read() long_description = fh.read()
PLYVEL = []
if sys.platform.startswith('linux'): ROCKSDB = []
PLYVEL.append('plyvel==1.3.0') if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
ROCKSDB.append('lbry-rocksdb==0.8.1')
setup( setup(
name=__name__, name=__name__,
@ -57,7 +59,7 @@ setup(
'pylru==1.1.0', 'pylru==1.1.0',
'elasticsearch==7.10.1', 'elasticsearch==7.10.1',
'grpcio==1.38.0' 'grpcio==1.38.0'
] + PLYVEL, ] + ROCKSDB,
extras_require={ extras_require={
'torrent': ['lbry-libtorrent'], 'torrent': ['lbry-libtorrent'],
'lint': ['pylint==2.10.0'], 'lint': ['pylint==2.10.0'],

47
test_rocksdb.py Executable file
View file

@ -0,0 +1,47 @@
#! python
import os
import shutil
import rocksdb
import tempfile
import logging
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
def _main(db_loc):
opts = rocksdb.Options(create_if_missing=True)
db = rocksdb.DB(os.path.join(db_loc, "test"), opts)
secondary_location = os.path.join(db_loc, "secondary")
secondary = rocksdb.DB(
os.path.join(db_loc, "test"),
rocksdb.Options(create_if_missing=True, max_open_files=-1),
secondary_name=secondary_location
)
try:
assert secondary.get(b"a") is None
db.put(b"a", b"b")
assert db.get(b"a") == b"b"
assert secondary.get(b"a") is None
secondary.try_catch_up_with_primary()
assert secondary.get(b"a") == b"b"
finally:
secondary.close()
db.close()
def main():
db_dir = tempfile.mkdtemp()
try:
_main(db_dir)
log.info("rocksdb %s (%s) works!", rocksdb.__version__, rocksdb.ROCKSDB_VERSION)
except:
log.exception("boom")
finally:
shutil.rmtree(db_dir)
if __name__ == "__main__":
main()

View file

@ -103,7 +103,7 @@ class AccountManagement(CommandTestCase):
second_account = await self.daemon.jsonrpc_account_create('second account') second_account = await self.daemon.jsonrpc_account_create('second account')
tx = await self.daemon.jsonrpc_account_send( tx = await self.daemon.jsonrpc_account_send(
'0.05', await self.daemon.jsonrpc_address_unused(account_id=second_account.id) '0.05', await self.daemon.jsonrpc_address_unused(account_id=second_account.id), blocking=True
) )
await self.confirm_tx(tx.id) await self.confirm_tx(tx.id)
await self.assertOutputAmount(['0.05', '9.949876'], utxo_list()) await self.assertOutputAmount(['0.05', '9.949876'], utxo_list())

View file

@ -112,7 +112,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.conductor.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
@ -191,7 +191,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.conductor.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg

View file

@ -101,12 +101,7 @@ class ReconnectTests(IntegrationTestCase):
self.ledger.network.client.transport.close() self.ledger.network.client.transport.close()
self.assertFalse(self.ledger.network.is_connected) self.assertFalse(self.ledger.network.is_connected)
await self.ledger.resolve([], 'derp') await self.ledger.resolve([], 'derp')
sendtxid = await self.blockchain.send_to_address(address1, 1.1337) sendtxid = await self.send_to_address_and_wait(address1, 1.1337, 1)
# await self.ledger.resolve([], 'derp')
# self.assertTrue(self.ledger.network.is_connected)
await asyncio.wait_for(self.on_transaction_id(sendtxid), 10.0) # mempool
await self.blockchain.generate(1)
await self.on_transaction_id(sendtxid) # confirmed
self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine
await self.assertBalance(self.account, '1.1337') await self.assertBalance(self.account, '1.1337')
@ -135,7 +130,7 @@ class ReconnectTests(IntegrationTestCase):
await self.conductor.spv_node.stop() await self.conductor.spv_node.stop()
self.assertFalse(self.ledger.network.is_connected) self.assertFalse(self.ledger.network.is_connected)
await asyncio.sleep(0.2) # let it retry and fail once await asyncio.sleep(0.2) # let it retry and fail once
await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
await self.ledger.network.on_connected.first await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected) self.assertTrue(self.ledger.network.is_connected)
@ -165,8 +160,10 @@ class UDPServerFailDiscoveryTest(AsyncioTestCase):
async def test_wallet_connects_despite_lack_of_udp(self): async def test_wallet_connects_despite_lack_of_udp(self):
conductor = Conductor() conductor = Conductor()
conductor.spv_node.udp_port = '0' conductor.spv_node.udp_port = '0'
await conductor.start_blockchain() await conductor.start_lbcd()
self.addCleanup(conductor.stop_blockchain) self.addCleanup(conductor.stop_lbcd)
await conductor.start_lbcwallet()
self.addCleanup(conductor.stop_lbcwallet)
await conductor.start_spv() await conductor.start_spv()
self.addCleanup(conductor.stop_spv) self.addCleanup(conductor.stop_spv)
self.assertFalse(conductor.spv_node.server.bp.status_server.is_running) self.assertFalse(conductor.spv_node.server.bp.status_server.is_running)

View file

@ -174,8 +174,7 @@ class PurchaseCommandTests(CommandTestCase):
self.merchant_address = await self.account.receiving.get_or_create_usable_address() self.merchant_address = await self.account.receiving.get_or_create_usable_address()
daemon2 = await self.add_daemon() daemon2 = await self.add_daemon()
address2 = await daemon2.wallet_manager.default_account.receiving.get_or_create_usable_address() address2 = await daemon2.wallet_manager.default_account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address2, 2) await self.send_to_address_and_wait(address2, 2, 1, ledger=daemon2.ledger)
await self.confirm_tx(sendtxid, daemon2.ledger)
stream = await self.priced_stream('a', '1.0') stream = await self.priced_stream('a', '1.0')
await self.assertBalance(self.account, '9.987893') await self.assertBalance(self.account, '9.987893')

View file

@ -23,7 +23,7 @@ class WalletCommands(CommandTestCase):
async def test_wallet_syncing_status(self): async def test_wallet_syncing_status(self):
address = await self.daemon.jsonrpc_address_unused() address = await self.daemon.jsonrpc_address_unused()
self.assertFalse(self.daemon.jsonrpc_wallet_status()['is_syncing']) self.assertFalse(self.daemon.jsonrpc_wallet_status()['is_syncing'])
await self.blockchain.send_to_address(address, 1) await self.send_to_address_and_wait(address, 1)
await self.ledger._update_tasks.started.wait() await self.ledger._update_tasks.started.wait()
self.assertTrue(self.daemon.jsonrpc_wallet_status()['is_syncing']) self.assertTrue(self.daemon.jsonrpc_wallet_status()['is_syncing'])
await self.ledger._update_tasks.done.wait() await self.ledger._update_tasks.done.wait()
@ -49,7 +49,7 @@ class WalletCommands(CommandTestCase):
self.assertEqual(status['wallet']['servers'][0]['port'], 50002) self.assertEqual(status['wallet']['servers'][0]['port'], 50002)
await self.conductor.spv_node.stop(True) await self.conductor.spv_node.stop(True)
self.conductor.spv_node.port = 54320 self.conductor.spv_node.port = 54320
await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
status = await self.daemon.jsonrpc_status() status = await self.daemon.jsonrpc_status()
self.assertEqual(len(status['wallet']['servers']), 0) self.assertEqual(len(status['wallet']['servers']), 0)
self.daemon.jsonrpc_settings_set('lbryum_servers', ['localhost:54320']) self.daemon.jsonrpc_settings_set('lbryum_servers', ['localhost:54320'])
@ -59,23 +59,22 @@ class WalletCommands(CommandTestCase):
self.assertEqual(status['wallet']['servers'][0]['port'], 54320) self.assertEqual(status['wallet']['servers'][0]['port'], 54320)
async def test_sending_to_scripthash_address(self): async def test_sending_to_scripthash_address(self):
self.assertEqual(await self.blockchain.get_balance(), '95.99973580') bal = await self.blockchain.get_balance()
await self.assertBalance(self.account, '10.0') await self.assertBalance(self.account, '10.0')
p2sh_address1 = await self.blockchain.get_new_address(self.blockchain.P2SH_SEGWIT_ADDRESS) p2sh_address1 = await self.blockchain.get_new_address(self.blockchain.P2SH_SEGWIT_ADDRESS)
tx = await self.account_send('2.0', p2sh_address1) tx = await self.account_send('2.0', p2sh_address1)
self.assertEqual(tx['outputs'][0]['address'], p2sh_address1) self.assertEqual(tx['outputs'][0]['address'], p2sh_address1)
self.assertEqual(await self.blockchain.get_balance(), '98.99973580') # +1 lbc for confirm block self.assertEqual(await self.blockchain.get_balance(), str(float(bal)+3)) # +1 lbc for confirm block
await self.assertBalance(self.account, '7.999877') await self.assertBalance(self.account, '7.999877')
await self.wallet_send('3.0', p2sh_address1) await self.wallet_send('3.0', p2sh_address1)
self.assertEqual(await self.blockchain.get_balance(), '102.99973580') # +1 lbc for confirm block self.assertEqual(await self.blockchain.get_balance(), str(float(bal)+7)) # +1 lbc for confirm block
await self.assertBalance(self.account, '4.999754') await self.assertBalance(self.account, '4.999754')
async def test_balance_caching(self): async def test_balance_caching(self):
account2 = await self.daemon.jsonrpc_account_create("Tip-er") account2 = await self.daemon.jsonrpc_account_create("Tip-er")
address2 = await self.daemon.jsonrpc_address_unused(account2.id) address2 = await self.daemon.jsonrpc_address_unused(account2.id)
sendtxid = await self.blockchain.send_to_address(address2, 10) await self.send_to_address_and_wait(address2, 10, 2)
await self.confirm_tx(sendtxid) await self.ledger.tasks_are_done() # don't mess with the query count while we need it
await self.generate(1)
wallet_balance = self.daemon.jsonrpc_wallet_balance wallet_balance = self.daemon.jsonrpc_wallet_balance
ledger = self.ledger ledger = self.ledger
@ -90,14 +89,16 @@ class WalletCommands(CommandTestCase):
self.assertIsNone(ledger._balance_cache.get(self.account.id)) self.assertIsNone(ledger._balance_cache.get(self.account.id))
query_count += 2 query_count += 2
self.assertEqual(await wallet_balance(), expected) balance = await wallet_balance()
self.assertEqual(self.ledger.db.db.query_count, query_count) self.assertEqual(self.ledger.db.db.query_count, query_count)
self.assertEqual(balance, expected)
self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(self.account.id))['total'], '10.0') self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(self.account.id))['total'], '10.0')
self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(account2.id))['total'], '10.0') self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(account2.id))['total'], '10.0')
# calling again uses cache # calling again uses cache
self.assertEqual(await wallet_balance(), expected) balance = await wallet_balance()
self.assertEqual(self.ledger.db.db.query_count, query_count) self.assertEqual(self.ledger.db.db.query_count, query_count)
self.assertEqual(balance, expected)
self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(self.account.id))['total'], '10.0') self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(self.account.id))['total'], '10.0')
self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(account2.id))['total'], '10.0') self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(account2.id))['total'], '10.0')
@ -123,8 +124,7 @@ class WalletCommands(CommandTestCase):
wallet2 = await self.daemon.jsonrpc_wallet_create('foo', create_account=True) wallet2 = await self.daemon.jsonrpc_wallet_create('foo', create_account=True)
account3 = wallet2.default_account account3 = wallet2.default_account
address3 = await self.daemon.jsonrpc_address_unused(account3.id, wallet2.id) address3 = await self.daemon.jsonrpc_address_unused(account3.id, wallet2.id)
await self.confirm_tx(await self.blockchain.send_to_address(address3, 1)) await self.send_to_address_and_wait(address3, 1, 1)
await self.generate(1)
account_balance = self.daemon.jsonrpc_account_balance account_balance = self.daemon.jsonrpc_account_balance
wallet_balance = self.daemon.jsonrpc_wallet_balance wallet_balance = self.daemon.jsonrpc_wallet_balance
@ -154,7 +154,7 @@ class WalletCommands(CommandTestCase):
address2 = await self.daemon.jsonrpc_address_unused(account2.id) address2 = await self.daemon.jsonrpc_address_unused(account2.id)
# send lbc to someone else # send lbc to someone else
tx = await self.daemon.jsonrpc_account_send('1.0', address2) tx = await self.daemon.jsonrpc_account_send('1.0', address2, blocking=True)
await self.confirm_tx(tx.id) await self.confirm_tx(tx.id)
self.assertEqual(await account_balance(), { self.assertEqual(await account_balance(), {
'total': '8.97741', 'total': '8.97741',
@ -187,7 +187,7 @@ class WalletCommands(CommandTestCase):
}) })
# tip claimed # tip claimed
tx = await self.daemon.jsonrpc_support_abandon(txid=support1['txid'], nout=0) tx = await self.daemon.jsonrpc_support_abandon(txid=support1['txid'], nout=0, blocking=True)
await self.confirm_tx(tx.id) await self.confirm_tx(tx.id)
self.assertEqual(await account_balance(), { self.assertEqual(await account_balance(), {
'total': '9.277303', 'total': '9.277303',
@ -238,8 +238,7 @@ class WalletEncryptionAndSynchronization(CommandTestCase):
"carbon smart garage balance margin twelve" "carbon smart garage balance margin twelve"
) )
address = (await self.daemon2.wallet_manager.default_account.receiving.get_addresses(limit=1, only_usable=True))[0] address = (await self.daemon2.wallet_manager.default_account.receiving.get_addresses(limit=1, only_usable=True))[0]
sendtxid = await self.blockchain.send_to_address(address, 1) await self.send_to_address_and_wait(address, 1, 1, ledger=self.daemon2.ledger)
await self.confirm_tx(sendtxid, self.daemon2.ledger)
def assertWalletEncrypted(self, wallet_path, encrypted): def assertWalletEncrypted(self, wallet_path, encrypted):
with open(wallet_path) as opened: with open(wallet_path) as opened:
@ -296,6 +295,8 @@ class WalletEncryptionAndSynchronization(CommandTestCase):
'a97a6d6a4a8effd29d748901bb9789352519cd00b13d' 'a97a6d6a4a8effd29d748901bb9789352519cd00b13d'
), self.daemon2) ), self.daemon2)
await self.confirm_tx(channel['txid'], self.daemon2.ledger) await self.confirm_tx(channel['txid'], self.daemon2.ledger)
channel = await daemon2.jsonrpc_channel_create('@foo', '0.1', blocking=True)
await self.confirm_tx(channel.id, self.daemon2.ledger)
# both daemons will have the channel but only one has the cert so far # both daemons will have the channel but only one has the cert so far
self.assertItemCount(await daemon.jsonrpc_channel_list(), 1) self.assertItemCount(await daemon.jsonrpc_channel_list(), 1)

View file

@ -494,8 +494,7 @@ class ClaimSearchCommand(ClaimTestCase):
tx = await Transaction.claim_create( tx = await Transaction.claim_create(
'unknown', b'{"sources":{"lbry_sd_hash":""}}', 1, address, [self.account], self.account) 'unknown', b'{"sources":{"lbry_sd_hash":""}}', 1, address, [self.account], self.account)
await tx.sign([self.account]) await tx.sign([self.account])
await self.broadcast(tx) await self.broadcast_and_confirm(tx)
await self.confirm_tx(tx.id)
octet = await self.stream_create() octet = await self.stream_create()
video = await self.stream_create('chrome', file_path=self.video_file_name) video = await self.stream_create('chrome', file_path=self.video_file_name)
@ -1226,7 +1225,7 @@ class ChannelCommands(CommandTestCase):
data_to_sign = "CAFEBABE" data_to_sign = "CAFEBABE"
# claim new name # claim new name
await self.channel_create('@someotherchan') await self.channel_create('@someotherchan')
channel_tx = await self.daemon.jsonrpc_channel_create('@signer', '0.1') channel_tx = await self.daemon.jsonrpc_channel_create('@signer', '0.1', blocking=True)
await self.confirm_tx(channel_tx.id) await self.confirm_tx(channel_tx.id)
channel = channel_tx.outputs[0] channel = channel_tx.outputs[0]
signature1 = await self.out(self.daemon.jsonrpc_channel_sign(channel_name='@signer', hexdata=data_to_sign)) signature1 = await self.out(self.daemon.jsonrpc_channel_sign(channel_name='@signer', hexdata=data_to_sign))
@ -1373,7 +1372,7 @@ class StreamCommands(ClaimTestCase):
self.assertEqual('8.989893', (await self.daemon.jsonrpc_account_balance())['available']) self.assertEqual('8.989893', (await self.daemon.jsonrpc_account_balance())['available'])
result = await self.out(self.daemon.jsonrpc_account_send( result = await self.out(self.daemon.jsonrpc_account_send(
'5.0', await self.daemon.jsonrpc_address_unused(account2_id) '5.0', await self.daemon.jsonrpc_address_unused(account2_id), blocking=True
)) ))
await self.confirm_tx(result['txid']) await self.confirm_tx(result['txid'])
@ -1555,7 +1554,7 @@ class StreamCommands(ClaimTestCase):
) )
# test setting from env vars and starting from scratch # test setting from env vars and starting from scratch
await self.conductor.spv_node.stop(False) await self.conductor.spv_node.stop(False)
await self.conductor.spv_node.start(self.conductor.blockchain_node, await self.conductor.spv_node.start(self.conductor.lbcwallet_node,
extraconf={'BLOCKING_CHANNEL_IDS': blocking_channel_id, extraconf={'BLOCKING_CHANNEL_IDS': blocking_channel_id,
'FILTERING_CHANNEL_IDS': filtering_channel_id}) 'FILTERING_CHANNEL_IDS': filtering_channel_id})
await self.daemon.wallet_manager.reset() await self.daemon.wallet_manager.reset()
@ -2172,7 +2171,7 @@ class SupportCommands(CommandTestCase):
tip = await self.out( tip = await self.out(
self.daemon.jsonrpc_support_create( self.daemon.jsonrpc_support_create(
claim_id, '1.0', True, account_id=account2.id, wallet_id='wallet2', claim_id, '1.0', True, account_id=account2.id, wallet_id='wallet2',
funding_account_ids=[account2.id]) funding_account_ids=[account2.id], blocking=True)
) )
await self.confirm_tx(tip['txid']) await self.confirm_tx(tip['txid'])
@ -2204,7 +2203,7 @@ class SupportCommands(CommandTestCase):
support = await self.out( support = await self.out(
self.daemon.jsonrpc_support_create( self.daemon.jsonrpc_support_create(
claim_id, '2.0', False, account_id=account2.id, wallet_id='wallet2', claim_id, '2.0', False, account_id=account2.id, wallet_id='wallet2',
funding_account_ids=[account2.id]) funding_account_ids=[account2.id], blocking=True)
) )
await self.confirm_tx(support['txid']) await self.confirm_tx(support['txid'])

View file

@ -1,3 +1,4 @@
import unittest
from unittest import skipIf from unittest import skipIf
import asyncio import asyncio
import os import os
@ -36,8 +37,7 @@ class FileCommands(CommandTestCase):
tx_to_update.outputs[0], claim, 1, address, [self.account], self.account tx_to_update.outputs[0], claim, 1, address, [self.account], self.account
) )
await tx.sign([self.account]) await tx.sign([self.account])
await self.broadcast(tx) await self.broadcast_and_confirm(tx)
await self.confirm_tx(tx.id)
self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session
self.client_session._session.add_dht_node(('localhost', 4040)) self.client_session._session.add_dht_node(('localhost', 4040))
self.client_session.wait_start = False # fixme: this is super slow on tests self.client_session.wait_start = False # fixme: this is super slow on tests
@ -222,6 +222,7 @@ class FileCommands(CommandTestCase):
await self.wait_files_to_complete() await self.wait_files_to_complete()
self.assertNotEqual(first_path, second_path) self.assertNotEqual(first_path, second_path)
@unittest.SkipTest # FIXME: claimname/updateclaim is gone. #3480 wip, unblock #3479"
async def test_file_list_updated_metadata_on_resolve(self): async def test_file_list_updated_metadata_on_resolve(self):
await self.stream_create('foo', '0.01') await self.stream_create('foo', '0.01')
txo = (await self.daemon.resolve(self.wallet.accounts, ['lbry://foo']))['lbry://foo'] txo = (await self.daemon.resolve(self.wallet.accounts, ['lbry://foo']))['lbry://foo']
@ -510,8 +511,7 @@ class FileCommands(CommandTestCase):
tx.outputs[0].claim.stream.fee.address_bytes = b'' tx.outputs[0].claim.stream.fee.address_bytes = b''
tx.outputs[0].script.generate() tx.outputs[0].script.generate()
await tx.sign([self.account]) await tx.sign([self.account])
await self.broadcast(tx) await self.broadcast_and_confirm(tx)
await self.confirm_tx(tx.id)
async def __raw_value_update_no_fee_amount(self, tx, claim_address): async def __raw_value_update_no_fee_amount(self, tx, claim_address):
tx = await self.daemon.jsonrpc_stream_update( tx = await self.daemon.jsonrpc_stream_update(
@ -521,8 +521,7 @@ class FileCommands(CommandTestCase):
tx.outputs[0].claim.stream.fee.message.ClearField('amount') tx.outputs[0].claim.stream.fee.message.ClearField('amount')
tx.outputs[0].script.generate() tx.outputs[0].script.generate()
await tx.sign([self.account]) await tx.sign([self.account])
await self.broadcast(tx) await self.broadcast_and_confirm(tx)
await self.confirm_tx(tx.id)
class DiskSpaceManagement(CommandTestCase): class DiskSpaceManagement(CommandTestCase):

View file

@ -80,7 +80,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
# After some soul searching Chris decides that his story needs more # After some soul searching Chris decides that his story needs more
# heart and a better ending. He takes down the story and begins the rewrite. # heart and a better ending. He takes down the story and begins the rewrite.
abandon = await self.out(self.daemon.jsonrpc_stream_abandon(claim_id, blocking=False)) abandon = await self.out(self.daemon.jsonrpc_stream_abandon(claim_id, blocking=True))
self.assertEqual(abandon['inputs'][0]['claim_id'], claim_id) self.assertEqual(abandon['inputs'][0]['claim_id'], claim_id)
await self.confirm_tx(abandon['txid']) await self.confirm_tx(abandon['txid'])
@ -103,7 +103,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
# 1 LBC to which Chris readily obliges # 1 LBC to which Chris readily obliges
ramsey_account_id = (await self.out(self.daemon.jsonrpc_account_create("Ramsey")))['id'] ramsey_account_id = (await self.out(self.daemon.jsonrpc_account_create("Ramsey")))['id']
ramsey_address = await self.daemon.jsonrpc_address_unused(ramsey_account_id) ramsey_address = await self.daemon.jsonrpc_address_unused(ramsey_account_id)
result = await self.out(self.daemon.jsonrpc_account_send('1.0', ramsey_address)) result = await self.out(self.daemon.jsonrpc_account_send('1.0', ramsey_address, blocking=True))
self.assertIn("txid", result) self.assertIn("txid", result)
await self.confirm_tx(result['txid']) await self.confirm_tx(result['txid'])
@ -133,7 +133,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
# And voila, and bravo and encore! His Best Friend Ramsey read the story and immediately knew this was a hit # And voila, and bravo and encore! His Best Friend Ramsey read the story and immediately knew this was a hit
# Now to keep this claim winning on the lbry blockchain he immediately supports the claim # Now to keep this claim winning on the lbry blockchain he immediately supports the claim
tx = await self.out(self.daemon.jsonrpc_support_create( tx = await self.out(self.daemon.jsonrpc_support_create(
claim_id2, '0.2', account_id=ramsey_account_id claim_id2, '0.2', account_id=ramsey_account_id, blocking=True
)) ))
await self.confirm_tx(tx['txid']) await self.confirm_tx(tx['txid'])
@ -147,7 +147,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
# Now he also wanted to support the original creator of the Award Winning Novel # Now he also wanted to support the original creator of the Award Winning Novel
# So he quickly decides to send a tip to him # So he quickly decides to send a tip to him
tx = await self.out( tx = await self.out(
self.daemon.jsonrpc_support_create(claim_id2, '0.3', tip=True, account_id=ramsey_account_id) self.daemon.jsonrpc_support_create(claim_id2, '0.3', tip=True, account_id=ramsey_account_id, blocking=True)
) )
await self.confirm_tx(tx['txid']) await self.confirm_tx(tx['txid'])
@ -158,7 +158,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
await self.generate(5) await self.generate(5)
# Seeing the ravishing success of his novel Chris adds support to his claim too # Seeing the ravishing success of his novel Chris adds support to his claim too
tx = await self.out(self.daemon.jsonrpc_support_create(claim_id2, '0.4')) tx = await self.out(self.daemon.jsonrpc_support_create(claim_id2, '0.4', blocking=True))
await self.confirm_tx(tx['txid']) await self.confirm_tx(tx['txid'])
# And check if his support showed up # And check if his support showed up
@ -183,7 +183,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
# But sadly Ramsey wasn't so pleased. It was hard for him to tell Chris... # But sadly Ramsey wasn't so pleased. It was hard for him to tell Chris...
# Chris, though a bit heartbroken, abandoned the claim for now, but instantly started working on new hit lyrics # Chris, though a bit heartbroken, abandoned the claim for now, but instantly started working on new hit lyrics
abandon = await self.out(self.daemon.jsonrpc_stream_abandon(txid=tx['txid'], nout=0, blocking=False)) abandon = await self.out(self.daemon.jsonrpc_stream_abandon(txid=tx['txid'], nout=0, blocking=True))
self.assertTrue(abandon['inputs'][0]['txid'], tx['txid']) self.assertTrue(abandon['inputs'][0]['txid'], tx['txid'])
await self.confirm_tx(abandon['txid']) await self.confirm_tx(abandon['txid'])

View file

@ -1,6 +1,7 @@
import asyncio import asyncio
import json import json
import hashlib import hashlib
import sys
from bisect import bisect_right from bisect import bisect_right
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from collections import defaultdict from collections import defaultdict
@ -31,13 +32,13 @@ class BaseResolveTestCase(CommandTestCase):
self.assertEqual(claim_from_es['effective_amount'], claim_from_db.effective_amount) self.assertEqual(claim_from_es['effective_amount'], claim_from_db.effective_amount)
def assertMatchDBClaim(self, expected, claim): def assertMatchDBClaim(self, expected, claim):
self.assertEqual(expected['claimId'], claim.claim_hash.hex()) self.assertEqual(expected['claimid'], claim.claim_hash.hex())
self.assertEqual(expected['validAtHeight'], claim.activation_height) self.assertEqual(expected['validatheight'], claim.activation_height)
self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height) self.assertEqual(expected['lasttakeoverheight'], claim.last_takeover_height)
self.assertEqual(expected['txId'], claim.tx_hash[::-1].hex()) self.assertEqual(expected['txid'], claim.tx_hash[::-1].hex())
self.assertEqual(expected['n'], claim.position) self.assertEqual(expected['n'], claim.position)
self.assertEqual(expected['amount'], claim.amount) self.assertEqual(expected['amount'], claim.amount)
self.assertEqual(expected['effectiveAmount'], claim.effective_amount) self.assertEqual(expected['effectiveamount'], claim.effective_amount)
async def assertResolvesToClaimId(self, name, claim_id): async def assertResolvesToClaimId(self, name, claim_id):
other = await self.resolve(name) other = await self.resolve(name)
@ -53,9 +54,10 @@ class BaseResolveTestCase(CommandTestCase):
self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex()) self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex())
async def assertNoClaimForName(self, name: str): async def assertNoClaimForName(self, name: str):
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
self.assertNotIn('claimId', lbrycrd_winning) if 'claims' in lbrycrd_winning and lbrycrd_winning['claims'] is not None:
self.assertEqual(len(lbrycrd_winning['claims']), 0)
if stream is not None: if stream is not None:
self.assertIsInstance(stream, LookupError) self.assertIsInstance(stream, LookupError)
else: else:
@ -63,20 +65,23 @@ class BaseResolveTestCase(CommandTestCase):
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name) claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name)
self.assertListEqual([], claim_from_es[0]) self.assertListEqual([], claim_from_es[0])
async def assertNoClaim(self, claim_id: str): async def assertNoClaim(self, name: str, claim_id: str):
self.assertDictEqual( expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebyid', name, '["' + claim_id + '"]'))
{}, json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id)) if 'claims' in expected and expected['claims'] is not None:
) # ensure that if we do have the matching claim that it is not active
self.assertEqual(expected['claims'][0]['effectiveamount'], 0)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id) claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id)
self.assertListEqual([], claim_from_es[0]) self.assertListEqual([], claim_from_es[0])
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
self.assertIsNone(claim) self.assertIsNone(claim)
async def assertMatchWinningClaim(self, name): async def assertMatchWinningClaim(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebybid', name, "[0]"))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
claim = stream if stream else channel claim = stream if stream else channel
await self._assertMatchClaim(expected, claim) expected['claims'][0]['lasttakeoverheight'] = expected['lasttakeoverheight']
await self._assertMatchClaim(expected['claims'][0], claim)
return claim return claim
async def _assertMatchClaim(self, expected, claim): async def _assertMatchClaim(self, expected, claim):
@ -86,70 +91,81 @@ class BaseResolveTestCase(CommandTestCase):
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
self.assertMatchESClaim(claim_from_es[0][0], claim) self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(claim.claim_hash.hex(), expected['supports'], claim_from_es[0][0]['support_amount']) self._check_supports(claim.claim_hash.hex(), expected.get('supports', []),
claim_from_es[0][0]['support_amount'])
async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True): async def assertMatchClaim(self, name, claim_id, is_active_in_lbrycrd=True):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
if is_active_in_lbrycrd:
if not expected:
self.assertIsNone(claim)
return
self.assertMatchDBClaim(expected, claim)
else:
self.assertDictEqual({}, expected)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_id=claim.claim_hash.hex() claim_id=claim.claim_hash.hex()
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex()) self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex())
self.assertMatchESClaim(claim_from_es[0][0], claim) self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(
claim.claim_hash.hex(), expected.get('supports', []), claim_from_es[0][0]['support_amount'], expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebyid', name, '["' + claim_id + '"]'))
is_active_in_lbrycrd if is_active_in_lbrycrd:
) if not expected:
self.assertIsNone(claim)
return
expected['claims'][0]['lasttakeoverheight'] = expected['lasttakeoverheight']
self.assertMatchDBClaim(expected['claims'][0], claim)
self._check_supports(claim.claim_hash.hex(), expected['claims'][0].get('supports', []),
claim_from_es[0][0]['support_amount'])
else:
if 'claims' in expected and expected['claims'] is not None:
# ensure that if we do have the matching claim that it is not active
self.assertEqual(expected['claims'][0]['effectiveamount'], 0)
return claim return claim
async def assertMatchClaimIsWinning(self, name, claim_id): async def assertMatchClaimIsWinning(self, name, claim_id):
self.assertEqual(claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) self.assertEqual(claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex())
await self.assertMatchClaimsForName(name) await self.assertMatchClaimsForName(name)
def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount, is_active_in_lbrycrd=True): def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount):
total_amount = 0 total_lbrycrd_amount = 0.0
total_es_amount = 0.0
active_es_amount = 0.0
db = self.conductor.spv_node.server.bp.db db = self.conductor.spv_node.server.bp.db
es_supports = db.get_supports(bytes.fromhex(claim_id))
for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))): # we're only concerned about active supports here, and they should match
total_amount += amount self.assertTrue(len(es_supports) >= len(lbrycrd_supports))
if is_active_in_lbrycrd:
support = lbrycrd_supports[i] for i, (tx_num, position, amount) in enumerate(es_supports):
self.assertEqual(support['txId'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex()) total_es_amount += amount
self.assertEqual(support['n'], position) valid_height = db.get_activation(tx_num, position, is_support=True)
if valid_height > db.db_height:
continue
active_es_amount += amount
txid = db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex()
support = next(filter(lambda s: s['txid'] == txid and s['n'] == position, lbrycrd_supports))
total_lbrycrd_amount += support['amount']
self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num)) self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num))
self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True)) self.assertEqual(support['validatheight'], valid_height)
self.assertEqual(total_amount, es_support_amount, f"lbrycrd support amount: {total_amount} vs es: {es_support_amount}")
self.assertEqual(total_es_amount, es_support_amount)
self.assertEqual(active_es_amount, total_lbrycrd_amount)
async def assertMatchClaimsForName(self, name): async def assertMatchClaimsForName(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name)) expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name, "", "true"))
db = self.conductor.spv_node.server.bp.db db = self.conductor.spv_node.server.bp.db
# self.assertEqual(len(expected['claims']), len(db_claims.claims))
# self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight)
last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight']
for c in expected['claims']: for c in expected['claims']:
c['lastTakeoverHeight'] = last_takeover c['lasttakeoverheight'] = expected['lasttakeoverheight']
claim_id = c['claimId'] claim_id = c['claimid']
claim_hash = bytes.fromhex(claim_id) claim_hash = bytes.fromhex(claim_id)
claim = db._fs_get_claim_by_hash(claim_hash) claim = db._fs_get_claim_by_hash(claim_hash)
self.assertMatchDBClaim(c, claim) self.assertMatchDBClaim(c, claim)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_id=c['claimId'] claim_id=claim_id
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), c['claimId']) self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim_id)
self.assertMatchESClaim(claim_from_es[0][0], claim) self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount']) self._check_supports(claim_id, c.get('supports', []),
claim_from_es[0][0]['support_amount'])
class ResolveCommand(BaseResolveTestCase): class ResolveCommand(BaseResolveTestCase):
@ -262,18 +278,18 @@ class ResolveCommand(BaseResolveTestCase):
self.assertEqual(claim['confirmations'], json.loads(tx_details)['confirmations']) self.assertEqual(claim['confirmations'], json.loads(tx_details)['confirmations'])
# resolve handles invalid data # resolve handles invalid data
await self.blockchain_claim_name("gibberish", hexlify(b"{'invalid':'json'}").decode(), "0.1") # await self.blockchain_claim_name("gibberish", hexlify(b"{'invalid':'json'}").decode(), "0.1")
await self.generate(1) # await self.generate(1)
response = await self.out(self.daemon.jsonrpc_resolve("lbry://gibberish")) # response = await self.out(self.daemon.jsonrpc_resolve("lbry://gibberish"))
self.assertSetEqual({'lbry://gibberish'}, set(response)) # self.assertSetEqual({'lbry://gibberish'}, set(response))
claim = response['lbry://gibberish'] # claim = response['lbry://gibberish']
self.assertEqual(claim['name'], 'gibberish') # self.assertEqual(claim['name'], 'gibberish')
self.assertNotIn('value', claim) # self.assertNotIn('value', claim)
# resolve retries # resolve retries
await self.conductor.spv_node.stop() await self.conductor.spv_node.stop()
resolve_task = asyncio.create_task(self.resolve('foo')) resolve_task = asyncio.create_task(self.resolve('foo'))
await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
self.assertIsNotNone((await resolve_task)['claim_id']) self.assertIsNotNone((await resolve_task)['claim_id'])
async def test_winning_by_effective_amount(self): async def test_winning_by_effective_amount(self):
@ -443,16 +459,16 @@ class ResolveCommand(BaseResolveTestCase):
self.assertEqual(one, claim6['name']) self.assertEqual(one, claim6['name'])
async def test_resolve_old_claim(self): async def test_resolve_old_claim(self):
channel = await self.daemon.jsonrpc_channel_create('@olds', '1.0') channel = await self.daemon.jsonrpc_channel_create('@olds', '1.0', blocking=True)
await self.confirm_tx(channel.id) await self.confirm_tx(channel.id)
address = channel.outputs[0].get_address(self.account.ledger) address = channel.outputs[0].get_address(self.account.ledger)
claim = generate_signed_legacy(address, channel.outputs[0]) claim = generate_signed_legacy(address, channel.outputs[0])
tx = await Transaction.claim_create('example', claim.SerializeToString(), 1, address, [self.account], self.account) tx = await Transaction.claim_create('example', claim.SerializeToString(), 1, address, [self.account], self.account)
await tx.sign([self.account]) await tx.sign([self.account])
await self.broadcast(tx) await self.broadcast_and_confirm(tx)
await self.confirm_tx(tx.id)
response = await self.resolve('@olds/example') response = await self.resolve('@olds/example')
self.assertTrue('is_channel_signature_valid' in response, str(response))
self.assertTrue(response['is_channel_signature_valid']) self.assertTrue(response['is_channel_signature_valid'])
claim.publisherSignature.signature = bytes(reversed(claim.publisherSignature.signature)) claim.publisherSignature.signature = bytes(reversed(claim.publisherSignature.signature))
@ -460,8 +476,7 @@ class ResolveCommand(BaseResolveTestCase):
'bad_example', claim.SerializeToString(), 1, address, [self.account], self.account 'bad_example', claim.SerializeToString(), 1, address, [self.account], self.account
) )
await tx.sign([self.account]) await tx.sign([self.account])
await self.broadcast(tx) await self.broadcast_and_confirm(tx)
await self.confirm_tx(tx.id)
response = await self.resolve('bad_example') response = await self.resolve('bad_example')
self.assertFalse(response['is_channel_signature_valid']) self.assertFalse(response['is_channel_signature_valid'])
@ -646,7 +661,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height)
await self.assertMatchClaimIsWinning(name, winning_claim_id) await self.assertMatchClaimIsWinning(name, winning_claim_id)
for non_winning in non_winning_claims: for non_winning in non_winning_claims:
claim = await self.assertMatchClaim( claim = await self.assertMatchClaim(name,
non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd
) )
self.assertEqual(non_winning.activation_height, claim.activation_height) self.assertEqual(non_winning.activation_height, claim.activation_height)
@ -992,7 +1007,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
name = 'test' name = 'test'
await self.generate(494) await self.generate(494)
address = (await self.account.receiving.get_addresses(True))[0] address = (await self.account.receiving.get_addresses(True))[0]
await self.blockchain.send_to_address(address, 400.0) await self.send_to_address_and_wait(address, 400.0)
await self.account.ledger.on_address.first await self.account.ledger.on_address.first
await self.generate(100) await self.generate(100)
self.assertEqual(800, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(800, self.conductor.spv_node.server.bp.db.db_height)
@ -1334,7 +1349,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.generate(1) await self.generate(1)
await self.assertMatchClaim(first_claim_id) await self.assertMatchClaim(name, first_claim_id)
await self.assertMatchClaimIsWinning(name, second_claim_id) await self.assertMatchClaimIsWinning(name, second_claim_id)
async def test_remove_controlling_support(self): async def test_remove_controlling_support(self):
@ -1405,12 +1420,12 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.generate(32) await self.generate(32)
second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id'] second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id']
await self.assertNoClaim(second_claim_id) await self.assertNoClaim(name, second_claim_id)
self.assertEqual( self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1 len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1
) )
await self.generate(1) await self.generate(1)
await self.assertMatchClaim(second_claim_id) await self.assertMatchClaim(name, second_claim_id)
self.assertEqual( self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2 len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2
) )
@ -1570,7 +1585,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.conductor.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
@ -1649,7 +1664,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.conductor.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg

View file

@ -21,9 +21,8 @@ class BasicTransactionTest(IntegrationTestCase):
[asyncio.ensure_future(self.on_address_update(address1)), [asyncio.ensure_future(self.on_address_update(address1)),
asyncio.ensure_future(self.on_address_update(address2))] asyncio.ensure_future(self.on_address_update(address2))]
)) ))
sendtxid1 = await self.blockchain.send_to_address(address1, 5) await self.send_to_address_and_wait(address1, 5)
sendtxid2 = await self.blockchain.send_to_address(address2, 5) await self.send_to_address_and_wait(address2, 5, 1)
await self.blockchain.generate(1)
await notifications await notifications
self.assertEqual(d2l(await self.account.get_balance()), '10.0') self.assertEqual(d2l(await self.account.get_balance()), '10.0')

View file

@ -1,3 +1,5 @@
import unittest
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
@ -6,7 +8,7 @@ class TransactionCommandsTestCase(CommandTestCase):
async def test_transaction_show(self): async def test_transaction_show(self):
# local tx # local tx
result = await self.out(self.daemon.jsonrpc_account_send( result = await self.out(self.daemon.jsonrpc_account_send(
'5.0', await self.daemon.jsonrpc_address_unused(self.account.id) '5.0', await self.daemon.jsonrpc_address_unused(self.account.id), blocking=True
)) ))
await self.confirm_tx(result['txid']) await self.confirm_tx(result['txid'])
tx = await self.daemon.jsonrpc_transaction_show(result['txid']) tx = await self.daemon.jsonrpc_transaction_show(result['txid'])
@ -27,10 +29,9 @@ class TransactionCommandsTestCase(CommandTestCase):
self.assertFalse(result['success']) self.assertFalse(result['success'])
async def test_utxo_release(self): async def test_utxo_release(self):
sendtxid = await self.blockchain.send_to_address( await self.send_to_address_and_wait(
await self.account.receiving.get_or_create_usable_address(), 1 await self.account.receiving.get_or_create_usable_address(), 1, 1
) )
await self.confirm_tx(sendtxid)
await self.assertBalance(self.account, '11.0') await self.assertBalance(self.account, '11.0')
await self.ledger.reserve_outputs(await self.account.get_utxos()) await self.ledger.reserve_outputs(await self.account.get_utxos())
await self.assertBalance(self.account, '0.0') await self.assertBalance(self.account, '0.0')
@ -53,14 +54,13 @@ class TestSegwit(CommandTestCase):
p2sh_txid2 = await self.blockchain.send_to_address(p2sh_address2, '1.0') p2sh_txid2 = await self.blockchain.send_to_address(p2sh_address2, '1.0')
bech32_txid1 = await self.blockchain.send_to_address(bech32_address1, '1.0') bech32_txid1 = await self.blockchain.send_to_address(bech32_address1, '1.0')
bech32_txid2 = await self.blockchain.send_to_address(bech32_address2, '1.0') bech32_txid2 = await self.blockchain.send_to_address(bech32_address2, '1.0')
await self.generate(1) await self.generate(1)
# P2SH & BECH32 can pay to P2SH address # P2SH & BECH32 can pay to P2SH address
tx = await self.blockchain.create_raw_transaction([ tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid1, "vout": 0}, {"txid": p2sh_txid1, "vout": 0},
{"txid": bech32_txid1, "vout": 0}, {"txid": bech32_txid1, "vout": 0},
], [{p2sh_address3: '1.9'}] ], {p2sh_address3: 1.9}
) )
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx) tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
p2sh_txid3 = await self.blockchain.send_raw_transaction(tx) p2sh_txid3 = await self.blockchain.send_raw_transaction(tx)
@ -71,7 +71,7 @@ class TestSegwit(CommandTestCase):
tx = await self.blockchain.create_raw_transaction([ tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid2, "vout": 0}, {"txid": p2sh_txid2, "vout": 0},
{"txid": bech32_txid2, "vout": 0}, {"txid": bech32_txid2, "vout": 0},
], [{bech32_address3: '1.9'}] ], {bech32_address3: 1.9}
) )
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx) tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
bech32_txid3 = await self.blockchain.send_raw_transaction(tx) bech32_txid3 = await self.blockchain.send_raw_transaction(tx)
@ -83,12 +83,9 @@ class TestSegwit(CommandTestCase):
tx = await self.blockchain.create_raw_transaction([ tx = await self.blockchain.create_raw_transaction([
{"txid": p2sh_txid3, "vout": 0}, {"txid": p2sh_txid3, "vout": 0},
{"txid": bech32_txid3, "vout": 0}, {"txid": bech32_txid3, "vout": 0},
], [{address: '3.5'}] ], {address: 3.5}
) )
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx) tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
txid = await self.blockchain.send_raw_transaction(tx) txid = await self.blockchain.send_raw_transaction(tx)
await self.on_transaction_id(txid) await self.generate_and_wait(1, [txid])
await self.generate(1)
await self.on_transaction_id(txid)
await self.assertBalance(self.account, '13.5') await self.assertBalance(self.account, '13.5')

View file

@ -1,7 +1,7 @@
import asyncio import asyncio
import random import random
from itertools import chain
import lbry.wallet.rpc.jsonrpc
from lbry.wallet.transaction import Transaction, Output, Input from lbry.wallet.transaction import Transaction, Output, Input
from lbry.testcase import IntegrationTestCase from lbry.testcase import IntegrationTestCase
from lbry.wallet.util import satoshis_to_coins, coins_to_satoshis from lbry.wallet.util import satoshis_to_coins, coins_to_satoshis
@ -19,10 +19,10 @@ class BasicTransactionTests(IntegrationTestCase):
# to the 10th receiving address for a total of 30 UTXOs on the entire account # to the 10th receiving address for a total of 30 UTXOs on the entire account
for i in range(10): for i in range(10):
notification = asyncio.ensure_future(self.on_address_update(addresses[i])) notification = asyncio.ensure_future(self.on_address_update(addresses[i]))
txid = await self.blockchain.send_to_address(addresses[i], 10) _ = await self.send_to_address_and_wait(addresses[i], 10)
await notification await notification
notification = asyncio.ensure_future(self.on_address_update(addresses[9])) notification = asyncio.ensure_future(self.on_address_update(addresses[9]))
txid = await self.blockchain.send_to_address(addresses[9], 10) _ = await self.send_to_address_and_wait(addresses[9], 10)
await notification await notification
# use batching to reduce issues with send_to_address on cli # use batching to reduce issues with send_to_address on cli
@ -88,12 +88,10 @@ class BasicTransactionTests(IntegrationTestCase):
await self.assertBalance(account2, '0.0') await self.assertBalance(account2, '0.0')
addresses = await account1.receiving.get_addresses() addresses = await account1.receiving.get_addresses()
txids = await asyncio.gather(*( txids = []
self.blockchain.send_to_address(address, 1.1) for address in addresses[:5] for address in addresses[:5]:
)) txids.append(await self.send_to_address_and_wait(address, 1.1))
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # mempool await self.generate_and_wait(1, txids)
await self.blockchain.generate(1)
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # confirmed
await self.assertBalance(account1, '5.5') await self.assertBalance(account1, '5.5')
await self.assertBalance(account2, '0.0') await self.assertBalance(account2, '0.0')
@ -148,11 +146,8 @@ class BasicTransactionTests(IntegrationTestCase):
return summary return summary
self.conductor.spv_node.server.bp.mempool.transaction_summaries = random_summary self.conductor.spv_node.server.bp.mempool.transaction_summaries = random_summary
# 10 unconfirmed txs, all from blockchain wallet # 10 unconfirmed txs, all from blockchain wallet
sends = [self.blockchain.send_to_address(address, 10) for _ in range(10)] for i in range(10):
# use batching to reduce issues with send_to_address on cli await self.send_to_address_and_wait(address, 10)
for batch in range(0, len(sends), 10):
txids = await asyncio.gather(*sends[batch:batch + 10])
await asyncio.wait([self.on_transaction_id(txid) for txid in txids])
remote_status = await self.ledger.network.subscribe_address(address) remote_status = await self.ledger.network.subscribe_address(address)
self.assertTrue(await self.ledger.update_history(address, remote_status)) self.assertTrue(await self.ledger.update_history(address, remote_status))
# 20 unconfirmed txs, 10 from blockchain, 10 from local to local # 20 unconfirmed txs, 10 from blockchain, 10 from local to local
@ -170,8 +165,7 @@ class BasicTransactionTests(IntegrationTestCase):
remote_status = await self.ledger.network.subscribe_address(address) remote_status = await self.ledger.network.subscribe_address(address)
self.assertTrue(await self.ledger.update_history(address, remote_status)) self.assertTrue(await self.ledger.update_history(address, remote_status))
# server history grows unordered # server history grows unordered
txid = await self.blockchain.send_to_address(address, 1) await self.send_to_address_and_wait(address, 1)
await self.on_transaction_id(txid)
self.assertTrue(await self.ledger.update_history(address, remote_status)) self.assertTrue(await self.ledger.update_history(address, remote_status))
self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1])) self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1]))
self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync)) self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync))
@ -195,7 +189,7 @@ class BasicTransactionTests(IntegrationTestCase):
self.ledger, 2000000000000, [self.account], set_reserved=False, return_insufficient_funds=True self.ledger, 2000000000000, [self.account], set_reserved=False, return_insufficient_funds=True
) )
got_amounts = [estimator.effective_amount for estimator in spendable] got_amounts = [estimator.effective_amount for estimator in spendable]
self.assertListEqual(amounts, got_amounts) self.assertListEqual(sorted(amounts), sorted(got_amounts))
async def test_sqlite_coin_chooser(self): async def test_sqlite_coin_chooser(self):
wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger}) wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger})
@ -206,26 +200,26 @@ class BasicTransactionTests(IntegrationTestCase):
other_account = self.wallet.generate_account(self.ledger) other_account = self.wallet.generate_account(self.ledger)
other_address = await other_account.receiving.get_or_create_usable_address() other_address = await other_account.receiving.get_or_create_usable_address()
self.ledger.coin_selection_strategy = 'sqlite' self.ledger.coin_selection_strategy = 'sqlite'
await self.ledger.subscribe_account(self.account) await self.ledger.subscribe_account(other_account)
accepted = asyncio.ensure_future(self.on_address_update(address)) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 1.0) _ = await self.send_to_address_and_wait(address, 1.0)
await accepted await accepted
accepted = asyncio.ensure_future(self.on_address_update(address)) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 1.0) _ = await self.send_to_address_and_wait(address, 1.0)
await accepted await accepted
accepted = asyncio.ensure_future(self.on_address_update(address)) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 3.0) _ = await self.send_to_address_and_wait(address, 3.0)
await accepted await accepted
accepted = asyncio.ensure_future(self.on_address_update(address)) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 5.0) _ = await self.send_to_address_and_wait(address, 5.0)
await accepted await accepted
accepted = asyncio.ensure_future(self.on_address_update(address)) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 10.0) _ = await self.send_to_address_and_wait(address, 10.0)
await accepted await accepted
await self.assertBalance(self.account, '20.0') await self.assertBalance(self.account, '20.0')
@ -266,6 +260,8 @@ class BasicTransactionTests(IntegrationTestCase):
async def broadcast(tx): async def broadcast(tx):
try: try:
return await real_broadcast(tx) return await real_broadcast(tx)
except lbry.wallet.rpc.jsonrpc.RPCError:
pass
finally: finally:
e.set() e.set()