Compare commits
31 commits
master
...
lbcd-integ
Author | SHA1 | Date | |
---|---|---|---|
|
378a6471ef | ||
|
f9c1222819 | ||
|
2bd60f692c | ||
|
2badd83294 | ||
|
c156332e01 | ||
|
beabafeda8 | ||
|
3aba2bf7be | ||
|
d231cdd4d0 | ||
|
7211274a70 | ||
|
f1820b42d3 | ||
|
2bd3605961 | ||
|
403fc1e0f5 | ||
|
425b4bdd2c | ||
|
9d7ec878ad | ||
|
93c472a352 | ||
|
a123fbc842 | ||
|
721834cea2 | ||
|
73ac42b786 | ||
|
20e6f87af9 | ||
|
787a50d6cc | ||
|
fbdc7100ea | ||
|
152a81c079 | ||
|
dbd3a49141 | ||
|
1564536c50 | ||
|
ae49760121 | ||
|
967cc9f0ab | ||
|
223d11200b | ||
|
13dec8756b | ||
|
d853fe0f6a | ||
|
83ff5c4134 | ||
|
bf3dec9c89 |
29 changed files with 775 additions and 391 deletions
|
@ -15,7 +15,6 @@ RUN apt-get update && \
|
|||
build-essential \
|
||||
automake libtool \
|
||||
pkg-config \
|
||||
libleveldb-dev \
|
||||
python3.7 \
|
||||
python3-dev \
|
||||
python3-pip \
|
||||
|
|
58
docker/Dockerfile.wallet_server_deploy
Normal file
58
docker/Dockerfile.wallet_server_deploy
Normal file
|
@ -0,0 +1,58 @@
|
|||
# FROM debian:10-slim
|
||||
FROM python:3.7.12-slim-buster
|
||||
|
||||
ARG user=lbry
|
||||
ARG db_dir=/database
|
||||
ARG projects_dir=/home/$user
|
||||
|
||||
ARG DOCKER_TAG
|
||||
ARG DOCKER_COMMIT=docker
|
||||
ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get -y --no-install-recommends install \
|
||||
wget \
|
||||
tar unzip \
|
||||
build-essential \
|
||||
automake libtool \
|
||||
pkg-config \
|
||||
librocksdb-dev
|
||||
# python3.7 \
|
||||
# python3-dev \
|
||||
# python3-pip \
|
||||
# python3-wheel \
|
||||
# python3-cffi \
|
||||
# python3-setuptools && \
|
||||
# update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \
|
||||
# rm -rf /var/lib/apt/lists/*
|
||||
|
||||
RUN pip install lbry-rocksdb
|
||||
RUN pip install uvloop
|
||||
RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user
|
||||
RUN mkdir -p $db_dir
|
||||
RUN chown -R $user:$user $db_dir
|
||||
|
||||
COPY . $projects_dir
|
||||
RUN chown -R $user:$user $projects_dir
|
||||
|
||||
USER $user
|
||||
WORKDIR $projects_dir
|
||||
|
||||
RUN make install
|
||||
RUN python3 docker/set_build.py
|
||||
RUN rm ~/.cache -rf
|
||||
|
||||
# entry point
|
||||
ARG host=0.0.0.0
|
||||
ARG tcp_port=50001
|
||||
ARG daemon_url=http://lbry:lbry@192.99.151.178:9245/
|
||||
VOLUME $db_dir
|
||||
ENV TCP_PORT=$tcp_port
|
||||
ENV HOST=$host
|
||||
ENV DAEMON_URL=$daemon_url
|
||||
ENV DB_DIRECTORY=$db_dir
|
||||
ENV MAX_SESSIONS=1000000000
|
||||
ENV MAX_SEND=1000000000000000000
|
||||
ENV EVENT_LOOP_POLICY=uvloop
|
||||
COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh
|
||||
ENTRYPOINT ["/entrypoint.sh"]
|
|
@ -19,7 +19,7 @@ from lbry.conf import Config
|
|||
from lbry.wallet.util import satoshis_to_coins
|
||||
from lbry.wallet.dewies import lbc_to_dewies
|
||||
from lbry.wallet.orchstr8 import Conductor
|
||||
from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode
|
||||
from lbry.wallet.orchstr8.node import LBCWalletNode, WalletNode, HubNode
|
||||
from lbry.schema.claim import Claim
|
||||
|
||||
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
|
||||
|
@ -230,7 +230,7 @@ class IntegrationTestCase(AsyncioTestCase):
|
|||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.conductor: Optional[Conductor] = None
|
||||
self.blockchain: Optional[BlockchainNode] = None
|
||||
self.blockchain: Optional[LBCWalletNode] = None
|
||||
self.hub: Optional[HubNode] = None
|
||||
self.wallet_node: Optional[WalletNode] = None
|
||||
self.manager: Optional[WalletManager] = None
|
||||
|
@ -240,15 +240,17 @@ class IntegrationTestCase(AsyncioTestCase):
|
|||
|
||||
async def asyncSetUp(self):
|
||||
self.conductor = Conductor(seed=self.SEED)
|
||||
await self.conductor.start_blockchain()
|
||||
self.addCleanup(self.conductor.stop_blockchain)
|
||||
await self.conductor.start_lbcd()
|
||||
self.addCleanup(self.conductor.stop_lbcd)
|
||||
await self.conductor.start_lbcwallet()
|
||||
self.addCleanup(self.conductor.stop_lbcwallet)
|
||||
await self.conductor.start_spv()
|
||||
self.addCleanup(self.conductor.stop_spv)
|
||||
await self.conductor.start_wallet()
|
||||
self.addCleanup(self.conductor.stop_wallet)
|
||||
await self.conductor.start_hub()
|
||||
self.addCleanup(self.conductor.stop_hub)
|
||||
self.blockchain = self.conductor.blockchain_node
|
||||
self.blockchain = self.conductor.lbcwallet_node
|
||||
self.hub = self.conductor.hub_node
|
||||
self.wallet_node = self.conductor.wallet_node
|
||||
self.manager = self.wallet_node.manager
|
||||
|
@ -263,6 +265,13 @@ class IntegrationTestCase(AsyncioTestCase):
|
|||
def broadcast(self, tx):
|
||||
return self.ledger.broadcast(tx)
|
||||
|
||||
async def broadcast_and_confirm(self, tx, ledger=None):
|
||||
ledger = ledger or self.ledger
|
||||
notifications = asyncio.create_task(ledger.wait(tx))
|
||||
await ledger.broadcast(tx)
|
||||
await notifications
|
||||
await self.generate_and_wait(1, [tx.id], ledger)
|
||||
|
||||
async def on_header(self, height):
|
||||
if self.ledger.headers.height < height:
|
||||
await self.ledger.on_header.where(
|
||||
|
@ -270,11 +279,29 @@ class IntegrationTestCase(AsyncioTestCase):
|
|||
)
|
||||
return True
|
||||
|
||||
def on_transaction_id(self, txid, ledger=None):
|
||||
return (ledger or self.ledger).on_transaction.where(
|
||||
lambda e: e.tx.id == txid
|
||||
async def send_to_address_and_wait(self, address, amount, blocks_to_generate=0, ledger=None):
|
||||
tx_watch = []
|
||||
txid = None
|
||||
done = False
|
||||
watcher = (ledger or self.ledger).on_transaction.where(
|
||||
lambda e: e.tx.id == txid or done or tx_watch.append(e.tx.id)
|
||||
)
|
||||
|
||||
txid = await self.blockchain.send_to_address(address, amount)
|
||||
done = txid in tx_watch
|
||||
await watcher
|
||||
|
||||
await self.generate_and_wait(blocks_to_generate, [txid], ledger)
|
||||
return txid
|
||||
|
||||
async def generate_and_wait(self, blocks_to_generate, txids, ledger=None):
|
||||
if blocks_to_generate > 0:
|
||||
watcher = (ledger or self.ledger).on_transaction.where(
|
||||
lambda e: ((e.tx.id in txids and txids.remove(e.tx.id)), len(txids) <= 0)[-1] # multi-statement lambda
|
||||
)
|
||||
await self.blockchain.generate(blocks_to_generate)
|
||||
await watcher
|
||||
|
||||
def on_address_update(self, address):
|
||||
return self.ledger.on_transaction.where(
|
||||
lambda e: e.address == address
|
||||
|
@ -345,20 +372,19 @@ class CommandTestCase(IntegrationTestCase):
|
|||
self.skip_libtorrent = True
|
||||
|
||||
async def asyncSetUp(self):
|
||||
await super().asyncSetUp()
|
||||
|
||||
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
|
||||
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
|
||||
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
|
||||
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
|
||||
|
||||
await super().asyncSetUp()
|
||||
|
||||
self.daemon = await self.add_daemon(self.wallet_node)
|
||||
|
||||
await self.account.ensure_address_gap()
|
||||
address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]
|
||||
sendtxid = await self.blockchain.send_to_address(address, 10)
|
||||
await self.confirm_tx(sendtxid)
|
||||
await self.generate(5)
|
||||
await self.send_to_address_and_wait(address, 10, 6)
|
||||
|
||||
server_tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(shutil.rmtree, server_tmp_dir)
|
||||
|
@ -455,9 +481,8 @@ class CommandTestCase(IntegrationTestCase):
|
|||
|
||||
async def confirm_tx(self, txid, ledger=None):
|
||||
""" Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """
|
||||
await self.on_transaction_id(txid, ledger)
|
||||
await self.generate(1)
|
||||
await self.on_transaction_id(txid, ledger)
|
||||
# actually, if it's in the mempool or in the block we're fine
|
||||
await self.generate_and_wait(1, [txid], ledger=ledger)
|
||||
return txid
|
||||
|
||||
async def on_transaction_dict(self, tx):
|
||||
|
@ -508,7 +533,7 @@ class CommandTestCase(IntegrationTestCase):
|
|||
return self.sout(tx)
|
||||
return tx
|
||||
|
||||
async def create_nondeterministic_channel(self, name, price, pubkey_bytes, daemon=None):
|
||||
async def create_nondeterministic_channel(self, name, price, pubkey_bytes, daemon=None, blocking=False):
|
||||
account = (daemon or self.daemon).wallet_manager.default_account
|
||||
claim_address = await account.receiving.get_or_create_usable_address()
|
||||
claim = Claim()
|
||||
|
@ -518,7 +543,7 @@ class CommandTestCase(IntegrationTestCase):
|
|||
claim_address, [self.account], self.account
|
||||
)
|
||||
await tx.sign([self.account])
|
||||
await (daemon or self.daemon).broadcast_or_release(tx, False)
|
||||
await (daemon or self.daemon).broadcast_or_release(tx, blocking)
|
||||
return self.sout(tx)
|
||||
|
||||
def create_upload_file(self, data, prefix=None, suffix=None):
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
__node_daemon__ = 'lbrycrdd'
|
||||
__node_cli__ = 'lbrycrd-cli'
|
||||
__node_bin__ = ''
|
||||
__node_url__ = (
|
||||
'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.6/lbrycrd-linux-1746.zip'
|
||||
__lbcd__ = 'lbcd'
|
||||
__lbcctl__ = 'lbcctl'
|
||||
__lbcwallet__ = 'lbcwallet'
|
||||
__lbcd_url__ = (
|
||||
'https://github.com/lbryio/lbcd/releases/download/' +
|
||||
'v0.22.200-beta/lbcd_0.22.200-beta_TARGET_PLATFORM.tar.gz'
|
||||
)
|
||||
__lbcwallet_url__ = (
|
||||
'https://github.com/lbryio/lbcwallet/releases/download/' +
|
||||
'v0.13.100-alpha-rc2/lbcwallet_0.13.100-alpha-rc2_TARGET_PLATFORM.tar.gz'
|
||||
)
|
||||
__spvserver__ = 'lbry.wallet.server.coin.LBCRegTest'
|
||||
|
||||
|
|
|
@ -365,6 +365,10 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
await self.db.close()
|
||||
await self.headers.close()
|
||||
|
||||
async def tasks_are_done(self):
|
||||
await self._update_tasks.done.wait()
|
||||
await self._other_tasks.done.wait()
|
||||
|
||||
@property
|
||||
def local_height_including_downloaded_height(self):
|
||||
return max(self.headers.height, self._download_height)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
__hub_url__ = (
|
||||
"https://github.com/lbryio/hub/releases/download/v0.2021.12.18.1/hub"
|
||||
"https://github.com/lbryio/hub/releases/download/v0.2022.01.21.1/hub"
|
||||
)
|
||||
from .node import Conductor
|
||||
from .service import ConductorService
|
||||
|
|
|
@ -5,7 +5,9 @@ import aiohttp
|
|||
|
||||
from lbry import wallet
|
||||
from lbry.wallet.orchstr8.node import (
|
||||
Conductor, get_blockchain_node_from_ledger
|
||||
Conductor,
|
||||
get_lbcd_node_from_ledger,
|
||||
get_lbcwallet_node_from_ledger
|
||||
)
|
||||
from lbry.wallet.orchstr8.service import ConductorService
|
||||
|
||||
|
@ -16,10 +18,11 @@ def get_argument_parser():
|
|||
)
|
||||
subparsers = parser.add_subparsers(dest='command', help='sub-command help')
|
||||
|
||||
subparsers.add_parser("download", help="Download blockchain node binary.")
|
||||
subparsers.add_parser("download", help="Download lbcd and lbcwallet node binaries.")
|
||||
|
||||
start = subparsers.add_parser("start", help="Start orchstr8 service.")
|
||||
start.add_argument("--blockchain", help="Hostname to start blockchain node.")
|
||||
start.add_argument("--lbcd", help="Hostname to start lbcd node.")
|
||||
start.add_argument("--lbcwallet", help="Hostname to start lbcwallet node.")
|
||||
start.add_argument("--spv", help="Hostname to start SPV server.")
|
||||
start.add_argument("--wallet", help="Hostname to start wallet daemon.")
|
||||
|
||||
|
@ -47,7 +50,8 @@ def main():
|
|||
|
||||
if command == 'download':
|
||||
logging.getLogger('blockchain').setLevel(logging.INFO)
|
||||
get_blockchain_node_from_ledger(wallet).ensure()
|
||||
get_lbcd_node_from_ledger(wallet).ensure()
|
||||
get_lbcwallet_node_from_ledger(wallet).ensure()
|
||||
|
||||
elif command == 'generate':
|
||||
loop.run_until_complete(run_remote_command(
|
||||
|
@ -57,9 +61,12 @@ def main():
|
|||
elif command == 'start':
|
||||
|
||||
conductor = Conductor()
|
||||
if getattr(args, 'blockchain', False):
|
||||
conductor.blockchain_node.hostname = args.blockchain
|
||||
loop.run_until_complete(conductor.start_blockchain())
|
||||
if getattr(args, 'lbcd', False):
|
||||
conductor.lbcd_node.hostname = args.lbcd
|
||||
loop.run_until_complete(conductor.start_lbcd())
|
||||
if getattr(args, 'lbcwallet', False):
|
||||
conductor.lbcwallet_node.hostname = args.lbcwallet
|
||||
loop.run_until_complete(conductor.start_lbcwallet())
|
||||
if getattr(args, 'spv', False):
|
||||
conductor.spv_node.hostname = args.spv
|
||||
loop.run_until_complete(conductor.start_spv())
|
||||
|
|
|
@ -8,6 +8,7 @@ import logging
|
|||
import tempfile
|
||||
import subprocess
|
||||
import importlib
|
||||
import platform
|
||||
from distutils.util import strtobool
|
||||
|
||||
from binascii import hexlify
|
||||
|
@ -31,11 +32,18 @@ def get_spvserver_from_ledger(ledger_module):
|
|||
return getattr(spvserver_module, regtest_class_name)
|
||||
|
||||
|
||||
def get_blockchain_node_from_ledger(ledger_module):
|
||||
return BlockchainNode(
|
||||
ledger_module.__node_url__,
|
||||
os.path.join(ledger_module.__node_bin__, ledger_module.__node_daemon__),
|
||||
os.path.join(ledger_module.__node_bin__, ledger_module.__node_cli__)
|
||||
def get_lbcd_node_from_ledger(ledger_module):
|
||||
return LBCDNode(
|
||||
ledger_module.__lbcd_url__,
|
||||
ledger_module.__lbcd__,
|
||||
ledger_module.__lbcctl__
|
||||
)
|
||||
|
||||
def get_lbcwallet_node_from_ledger(ledger_module):
|
||||
return LBCWalletNode(
|
||||
ledger_module.__lbcwallet_url__,
|
||||
ledger_module.__lbcwallet__,
|
||||
ledger_module.__lbcctl__
|
||||
)
|
||||
|
||||
|
||||
|
@ -45,51 +53,52 @@ class Conductor:
|
|||
self.manager_module = WalletManager
|
||||
self.spv_module = get_spvserver_from_ledger(lbry.wallet)
|
||||
|
||||
self.blockchain_node = get_blockchain_node_from_ledger(lbry.wallet)
|
||||
self.lbcd_node = get_lbcd_node_from_ledger(lbry.wallet)
|
||||
self.lbcwallet_node = get_lbcwallet_node_from_ledger(lbry.wallet)
|
||||
self.spv_node = SPVNode(self.spv_module)
|
||||
self.wallet_node = WalletNode(
|
||||
self.manager_module, RegTestLedger, default_seed=seed
|
||||
)
|
||||
self.hub_node = HubNode(__hub_url__, "hub", self.spv_node)
|
||||
|
||||
self.blockchain_started = False
|
||||
self.lbcd_started = False
|
||||
self.lbcwallet_started = False
|
||||
self.spv_started = False
|
||||
self.wallet_started = False
|
||||
self.hub_started = False
|
||||
|
||||
self.log = log.getChild('conductor')
|
||||
|
||||
async def start_blockchain(self):
|
||||
if not self.blockchain_started:
|
||||
asyncio.create_task(self.blockchain_node.start())
|
||||
await self.blockchain_node.running.wait()
|
||||
await self.blockchain_node.generate(200)
|
||||
self.blockchain_started = True
|
||||
async def start_lbcd(self):
|
||||
if not self.lbcd_started:
|
||||
asyncio.create_task(self.lbcd_node.start())
|
||||
await self.lbcd_node.running.wait()
|
||||
self.lbcd_started = True
|
||||
|
||||
async def stop_blockchain(self):
|
||||
if self.blockchain_started:
|
||||
await self.blockchain_node.stop(cleanup=True)
|
||||
self.blockchain_started = False
|
||||
async def stop_lbcd(self, cleanup=True):
|
||||
if self.lbcd_started:
|
||||
await self.lbcd_node.stop(cleanup)
|
||||
self.lbcd_started = False
|
||||
|
||||
async def start_hub(self):
|
||||
if not self.hub_started:
|
||||
asyncio.create_task(self.hub_node.start())
|
||||
await self.blockchain_node.running.wait()
|
||||
await self.lbcwallet_node.running.wait()
|
||||
self.hub_started = True
|
||||
|
||||
async def stop_hub(self):
|
||||
async def stop_hub(self, cleanup=True):
|
||||
if self.hub_started:
|
||||
await self.hub_node.stop(cleanup=True)
|
||||
await self.hub_node.stop(cleanup)
|
||||
self.hub_started = False
|
||||
|
||||
async def start_spv(self):
|
||||
if not self.spv_started:
|
||||
await self.spv_node.start(self.blockchain_node)
|
||||
await self.spv_node.start(self.lbcwallet_node)
|
||||
self.spv_started = True
|
||||
|
||||
async def stop_spv(self):
|
||||
async def stop_spv(self, cleanup=True):
|
||||
if self.spv_started:
|
||||
await self.spv_node.stop(cleanup=True)
|
||||
await self.spv_node.stop(cleanup)
|
||||
self.spv_started = False
|
||||
|
||||
async def start_wallet(self):
|
||||
|
@ -97,13 +106,31 @@ class Conductor:
|
|||
await self.wallet_node.start(self.spv_node)
|
||||
self.wallet_started = True
|
||||
|
||||
async def stop_wallet(self):
|
||||
async def stop_wallet(self, cleanup=True):
|
||||
if self.wallet_started:
|
||||
await self.wallet_node.stop(cleanup=True)
|
||||
await self.wallet_node.stop(cleanup)
|
||||
self.wallet_started = False
|
||||
|
||||
async def start_lbcwallet(self, clean=True):
|
||||
if not self.lbcwallet_started:
|
||||
asyncio.create_task(self.lbcwallet_node.start())
|
||||
await self.lbcwallet_node.running.wait()
|
||||
if clean:
|
||||
mining_addr = await self.lbcwallet_node.get_new_address()
|
||||
self.lbcwallet_node.mining_addr = mining_addr
|
||||
await self.lbcwallet_node.generate(200)
|
||||
# unlock the wallet for the next 1 hour
|
||||
await self.lbcwallet_node.wallet_passphrase("password", 3600)
|
||||
self.lbcwallet_started = True
|
||||
|
||||
async def stop_lbcwallet(self, cleanup=True):
|
||||
if self.lbcwallet_started:
|
||||
await self.lbcwallet_node.stop(cleanup)
|
||||
self.lbcwallet_started = False
|
||||
|
||||
async def start(self):
|
||||
await self.start_blockchain()
|
||||
await self.start_lbcd()
|
||||
await self.start_lbcwallet()
|
||||
await self.start_spv()
|
||||
await self.start_wallet()
|
||||
|
||||
|
@ -111,7 +138,8 @@ class Conductor:
|
|||
all_the_stops = [
|
||||
self.stop_wallet,
|
||||
self.stop_spv,
|
||||
self.stop_blockchain
|
||||
self.stop_lbcwallet,
|
||||
self.stop_lbcd
|
||||
]
|
||||
for stop in all_the_stops:
|
||||
try:
|
||||
|
@ -119,6 +147,11 @@ class Conductor:
|
|||
except Exception as e:
|
||||
log.exception('Exception raised while stopping services:', exc_info=e)
|
||||
|
||||
async def clear_mempool(self):
|
||||
await self.stop_lbcwallet(cleanup=False)
|
||||
await self.stop_lbcd(cleanup=False)
|
||||
await self.start_lbcd()
|
||||
await self.start_lbcwallet(clean=False)
|
||||
|
||||
class WalletNode:
|
||||
|
||||
|
@ -139,10 +172,11 @@ class WalletNode:
|
|||
|
||||
async def start(self, spv_node: 'SPVNode', seed=None, connect=True, config=None):
|
||||
wallets_dir = os.path.join(self.data_path, 'wallets')
|
||||
os.mkdir(wallets_dir)
|
||||
wallet_file_name = os.path.join(wallets_dir, 'my_wallet.json')
|
||||
with open(wallet_file_name, 'w') as wallet_file:
|
||||
wallet_file.write('{"version": 1, "accounts": []}\n')
|
||||
if not os.path.isdir(wallets_dir):
|
||||
os.mkdir(wallets_dir)
|
||||
with open(wallet_file_name, 'w') as wallet_file:
|
||||
wallet_file.write('{"version": 1, "accounts": []}\n')
|
||||
self.manager = self.manager_class.from_config({
|
||||
'ledgers': {
|
||||
self.ledger_class.get_id(): {
|
||||
|
@ -198,14 +232,14 @@ class SPVNode:
|
|||
self.stopped = False
|
||||
self.index_name = uuid4().hex
|
||||
|
||||
async def start(self, blockchain_node: 'BlockchainNode', extraconf=None):
|
||||
async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None):
|
||||
self.data_path = tempfile.mkdtemp()
|
||||
conf = {
|
||||
'DESCRIPTION': '',
|
||||
'PAYMENT_ADDRESS': '',
|
||||
'DAILY_FEE': '0',
|
||||
'DB_DIRECTORY': self.data_path,
|
||||
'DAEMON_URL': blockchain_node.rpc_url,
|
||||
'DAEMON_URL': lbcwallet_node.rpc_url,
|
||||
'REORG_LIMIT': '100',
|
||||
'HOST': self.hostname,
|
||||
'TCP_PORT': str(self.port),
|
||||
|
@ -240,18 +274,19 @@ class SPVNode:
|
|||
shutil.rmtree(self.data_path, ignore_errors=True)
|
||||
|
||||
|
||||
class BlockchainProcess(asyncio.SubprocessProtocol):
|
||||
class LBCDProcess(asyncio.SubprocessProtocol):
|
||||
|
||||
IGNORE_OUTPUT = [
|
||||
b'keypool keep',
|
||||
b'keypool reserve',
|
||||
b'keypool return',
|
||||
b'Block submitted',
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
self.ready = asyncio.Event()
|
||||
self.stopped = asyncio.Event()
|
||||
self.log = log.getChild('blockchain')
|
||||
self.log = log.getChild('lbcd')
|
||||
|
||||
def pipe_data_received(self, fd, data):
|
||||
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
|
||||
|
@ -262,7 +297,7 @@ class BlockchainProcess(asyncio.SubprocessProtocol):
|
|||
if b'Error:' in data:
|
||||
self.ready.set()
|
||||
raise SystemError(data.decode())
|
||||
if b'Done loading' in data:
|
||||
if b'RPCS: RPC server listening on' in data:
|
||||
self.ready.set()
|
||||
|
||||
def process_exited(self):
|
||||
|
@ -270,10 +305,34 @@ class BlockchainProcess(asyncio.SubprocessProtocol):
|
|||
self.ready.set()
|
||||
|
||||
|
||||
class BlockchainNode:
|
||||
class WalletProcess(asyncio.SubprocessProtocol):
|
||||
|
||||
P2SH_SEGWIT_ADDRESS = "p2sh-segwit"
|
||||
BECH32_ADDRESS = "bech32"
|
||||
IGNORE_OUTPUT = [
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
self.ready = asyncio.Event()
|
||||
self.stopped = asyncio.Event()
|
||||
self.log = log.getChild('lbcwallet')
|
||||
self.transport: Optional[asyncio.transports.SubprocessTransport] = None
|
||||
|
||||
def pipe_data_received(self, fd, data):
|
||||
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
|
||||
if b'Error:' in data:
|
||||
self.log.error(data.decode())
|
||||
else:
|
||||
self.log.info(data.decode())
|
||||
if b'Error:' in data:
|
||||
self.ready.set()
|
||||
raise SystemError(data.decode())
|
||||
if b'WLLT: Finished rescan' in data:
|
||||
self.ready.set()
|
||||
|
||||
def process_exited(self):
|
||||
self.stopped.set()
|
||||
self.ready.set()
|
||||
|
||||
class LBCDNode:
|
||||
|
||||
def __init__(self, url, daemon, cli):
|
||||
self.latest_release_url = url
|
||||
|
@ -281,28 +340,22 @@ class BlockchainNode:
|
|||
self.bin_dir = os.path.join(self.project_dir, 'bin')
|
||||
self.daemon_bin = os.path.join(self.bin_dir, daemon)
|
||||
self.cli_bin = os.path.join(self.bin_dir, cli)
|
||||
self.log = log.getChild('blockchain')
|
||||
self.data_path = None
|
||||
self.log = log.getChild('lbcd')
|
||||
self.data_path = tempfile.mkdtemp()
|
||||
self.protocol = None
|
||||
self.transport = None
|
||||
self.block_expected = 0
|
||||
self.hostname = 'localhost'
|
||||
self.peerport = 9246 + 2 # avoid conflict with default peer port
|
||||
self.rpcport = 9245 + 2 # avoid conflict with default rpc port
|
||||
self.peerport = 29246
|
||||
self.rpcport = 29245
|
||||
self.rpcuser = 'rpcuser'
|
||||
self.rpcpassword = 'rpcpassword'
|
||||
self.stopped = False
|
||||
self.restart_ready = asyncio.Event()
|
||||
self.restart_ready.set()
|
||||
self.stopped = True
|
||||
self.running = asyncio.Event()
|
||||
|
||||
@property
|
||||
def rpc_url(self):
|
||||
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
|
||||
|
||||
def is_expected_block(self, e: BlockHeightEvent):
|
||||
return self.block_expected == e.height
|
||||
|
||||
@property
|
||||
def exists(self):
|
||||
return (
|
||||
|
@ -311,6 +364,12 @@ class BlockchainNode:
|
|||
)
|
||||
|
||||
def download(self):
|
||||
uname = platform.uname()
|
||||
target_os = str.lower(uname.system)
|
||||
target_arch = str.replace(uname.machine, 'x86_64', 'amd64')
|
||||
target_platform = target_os + '_' + target_arch
|
||||
self.latest_release_url = str.replace(self.latest_release_url, 'TARGET_PLATFORM', target_platform)
|
||||
|
||||
downloaded_file = os.path.join(
|
||||
self.bin_dir,
|
||||
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
|
||||
|
@ -345,34 +404,34 @@ class BlockchainNode:
|
|||
|
||||
async def start(self):
|
||||
assert self.ensure()
|
||||
self.data_path = tempfile.mkdtemp()
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio.get_child_watcher().attach_loop(loop)
|
||||
command = [
|
||||
self.daemon_bin,
|
||||
f'-datadir={self.data_path}', '-printtoconsole', '-regtest', '-server', '-txindex',
|
||||
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}',
|
||||
f'-port={self.peerport}'
|
||||
'--notls',
|
||||
f'--datadir={self.data_path}',
|
||||
'--regtest', f'--listen=127.0.0.1:{self.peerport}', f'--rpclisten=127.0.0.1:{self.rpcport}',
|
||||
'--txindex', f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}'
|
||||
]
|
||||
self.log.info(' '.join(command))
|
||||
while not self.stopped:
|
||||
while self.stopped:
|
||||
if self.running.is_set():
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
await self.restart_ready.wait()
|
||||
try:
|
||||
self.transport, self.protocol = await loop.subprocess_exec(
|
||||
BlockchainProcess, *command
|
||||
LBCDProcess, *command
|
||||
)
|
||||
await self.protocol.ready.wait()
|
||||
assert not self.protocol.stopped.is_set()
|
||||
self.running.set()
|
||||
self.stopped = False
|
||||
except asyncio.CancelledError:
|
||||
self.running.clear()
|
||||
raise
|
||||
except Exception as e:
|
||||
self.running.clear()
|
||||
log.exception('failed to start lbrycrdd', exc_info=e)
|
||||
log.exception('failed to start lbcd', exc_info=e)
|
||||
|
||||
async def stop(self, cleanup=True):
|
||||
self.stopped = True
|
||||
|
@ -381,35 +440,161 @@ class BlockchainNode:
|
|||
await self.protocol.stopped.wait()
|
||||
self.transport.close()
|
||||
finally:
|
||||
self.log.info("Done shutting down " + self.daemon_bin)
|
||||
if cleanup:
|
||||
self.cleanup()
|
||||
|
||||
async def clear_mempool(self):
|
||||
self.restart_ready.clear()
|
||||
self.transport.terminate()
|
||||
await self.protocol.stopped.wait()
|
||||
self.transport.close()
|
||||
self.running.clear()
|
||||
os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
|
||||
self.restart_ready.set()
|
||||
await self.running.wait()
|
||||
self.running.clear()
|
||||
|
||||
def cleanup(self):
|
||||
assert self.stopped
|
||||
shutil.rmtree(self.data_path, ignore_errors=True)
|
||||
|
||||
|
||||
class LBCWalletNode:
|
||||
P2SH_SEGWIT_ADDRESS = "p2sh-segwit"
|
||||
BECH32_ADDRESS = "bech32"
|
||||
|
||||
def __init__(self, url, lbcwallet, cli):
|
||||
self.latest_release_url = url
|
||||
self.project_dir = os.path.dirname(os.path.dirname(__file__))
|
||||
self.bin_dir = os.path.join(self.project_dir, 'bin')
|
||||
self.lbcwallet_bin = os.path.join(self.bin_dir, lbcwallet)
|
||||
self.cli_bin = os.path.join(self.bin_dir, cli)
|
||||
self.log = log.getChild('lbcwallet')
|
||||
self.protocol = None
|
||||
self.transport = None
|
||||
self.hostname = 'localhost'
|
||||
self.lbcd_rpcport = 29245
|
||||
self.lbcwallet_rpcport = 29244
|
||||
self.rpcuser = 'rpcuser'
|
||||
self.rpcpassword = 'rpcpassword'
|
||||
self.data_path = tempfile.mkdtemp()
|
||||
self.stopped = True
|
||||
self.running = asyncio.Event()
|
||||
self.block_expected = 0
|
||||
self.mining_addr = ''
|
||||
|
||||
@property
|
||||
def rpc_url(self):
|
||||
# FIXME: somehow the hub/sdk doesn't learn the blocks through the Walet RPC port, why?
|
||||
# return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.lbcwallet_rpcport}/'
|
||||
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.lbcd_rpcport}/'
|
||||
|
||||
def is_expected_block(self, e: BlockHeightEvent):
|
||||
return self.block_expected == e.height
|
||||
|
||||
@property
|
||||
def exists(self):
|
||||
return (
|
||||
os.path.exists(self.lbcwallet_bin)
|
||||
)
|
||||
|
||||
def download(self):
|
||||
uname = platform.uname()
|
||||
target_os = str.lower(uname.system)
|
||||
target_arch = str.replace(uname.machine, 'x86_64', 'amd64')
|
||||
target_platform = target_os + '_' + target_arch
|
||||
self.latest_release_url = str.replace(self.latest_release_url, 'TARGET_PLATFORM', target_platform)
|
||||
|
||||
downloaded_file = os.path.join(
|
||||
self.bin_dir,
|
||||
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
|
||||
)
|
||||
|
||||
if not os.path.exists(self.bin_dir):
|
||||
os.mkdir(self.bin_dir)
|
||||
|
||||
if not os.path.exists(downloaded_file):
|
||||
self.log.info('Downloading: %s', self.latest_release_url)
|
||||
with urllib.request.urlopen(self.latest_release_url) as response:
|
||||
with open(downloaded_file, 'wb') as out_file:
|
||||
shutil.copyfileobj(response, out_file)
|
||||
|
||||
self.log.info('Extracting: %s', downloaded_file)
|
||||
|
||||
if downloaded_file.endswith('.zip'):
|
||||
with zipfile.ZipFile(downloaded_file) as dotzip:
|
||||
dotzip.extractall(self.bin_dir)
|
||||
# zipfile bug https://bugs.python.org/issue15795
|
||||
os.chmod(self.lbcwallet_bin, 0o755)
|
||||
|
||||
elif downloaded_file.endswith('.tar.gz'):
|
||||
with tarfile.open(downloaded_file) as tar:
|
||||
tar.extractall(self.bin_dir)
|
||||
|
||||
return self.exists
|
||||
|
||||
def ensure(self):
|
||||
return self.exists or self.download()
|
||||
|
||||
async def start(self):
|
||||
assert self.ensure()
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio.get_child_watcher().attach_loop(loop)
|
||||
|
||||
command = [
|
||||
self.lbcwallet_bin,
|
||||
'--noservertls', '--noclienttls',
|
||||
'--regtest',
|
||||
f'--rpcconnect=127.0.0.1:{self.lbcd_rpcport}', f'--rpclisten=127.0.0.1:{self.lbcwallet_rpcport}',
|
||||
'--createtemp', f'--appdata={self.data_path}',
|
||||
f'--username={self.rpcuser}', f'--password={self.rpcpassword}'
|
||||
]
|
||||
self.log.info(' '.join(command))
|
||||
while self.stopped:
|
||||
if self.running.is_set():
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
try:
|
||||
self.transport, self.protocol = await loop.subprocess_exec(
|
||||
WalletProcess, *command
|
||||
)
|
||||
self.protocol.transport = self.transport
|
||||
await self.protocol.ready.wait()
|
||||
assert not self.protocol.stopped.is_set()
|
||||
self.running.set()
|
||||
self.stopped = False
|
||||
except asyncio.CancelledError:
|
||||
self.running.clear()
|
||||
raise
|
||||
except Exception as e:
|
||||
self.running.clear()
|
||||
log.exception('failed to start lbcwallet', exc_info=e)
|
||||
|
||||
def cleanup(self):
|
||||
assert self.stopped
|
||||
shutil.rmtree(self.data_path, ignore_errors=True)
|
||||
|
||||
async def stop(self, cleanup=True):
|
||||
self.stopped = True
|
||||
try:
|
||||
self.transport.terminate()
|
||||
await self.protocol.stopped.wait()
|
||||
self.transport.close()
|
||||
finally:
|
||||
self.log.info("Done shutting down " + self.lbcwallet_bin)
|
||||
if cleanup:
|
||||
self.cleanup()
|
||||
self.running.clear()
|
||||
|
||||
async def _cli_cmnd(self, *args):
|
||||
cmnd_args = [
|
||||
self.cli_bin, f'-datadir={self.data_path}', '-regtest',
|
||||
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}'
|
||||
self.cli_bin,
|
||||
f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}', '--notls', '--regtest', '--wallet'
|
||||
] + list(args)
|
||||
self.log.info(' '.join(cmnd_args))
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio.get_child_watcher().attach_loop(loop)
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmnd_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
|
||||
*cmnd_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
out, _ = await process.communicate()
|
||||
out, err = await process.communicate()
|
||||
result = out.decode().strip()
|
||||
err = err.decode().strip()
|
||||
if len(result) <= 0 and err.startswith('-'):
|
||||
raise Exception(err)
|
||||
if err and 'creating a default config file' not in err:
|
||||
log.warning(err)
|
||||
self.log.info(result)
|
||||
if result.startswith('error code'):
|
||||
raise Exception(result)
|
||||
|
@ -417,7 +602,14 @@ class BlockchainNode:
|
|||
|
||||
def generate(self, blocks):
|
||||
self.block_expected += blocks
|
||||
return self._cli_cmnd('generate', str(blocks))
|
||||
return self._cli_cmnd('generatetoaddress', str(blocks), self.mining_addr)
|
||||
|
||||
def generate_to_address(self, blocks, addr):
|
||||
self.block_expected += blocks
|
||||
return self._cli_cmnd('generatetoaddress', str(blocks), addr)
|
||||
|
||||
def wallet_passphrase(self, passphrase, timeout):
|
||||
return self._cli_cmnd('walletpassphrase', passphrase, str(timeout))
|
||||
|
||||
def invalidate_block(self, blockhash):
|
||||
return self._cli_cmnd('invalidateblock', blockhash)
|
||||
|
@ -434,7 +626,7 @@ class BlockchainNode:
|
|||
def get_raw_change_address(self):
|
||||
return self._cli_cmnd('getrawchangeaddress')
|
||||
|
||||
def get_new_address(self, address_type):
|
||||
def get_new_address(self, address_type='legacy'):
|
||||
return self._cli_cmnd('getnewaddress', "", address_type)
|
||||
|
||||
async def get_balance(self):
|
||||
|
@ -450,7 +642,10 @@ class BlockchainNode:
|
|||
return self._cli_cmnd('createrawtransaction', json.dumps(inputs), json.dumps(outputs))
|
||||
|
||||
async def sign_raw_transaction_with_wallet(self, tx):
|
||||
return json.loads(await self._cli_cmnd('signrawtransactionwithwallet', tx))['hex'].encode()
|
||||
# the "withwallet" portion should only come into play if we are doing segwit.
|
||||
# and "withwallet" doesn't exist on lbcd yet.
|
||||
result = await self._cli_cmnd('signrawtransaction', tx)
|
||||
return json.loads(result)['hex'].encode()
|
||||
|
||||
def decode_raw_transaction(self, tx):
|
||||
return self._cli_cmnd('decoderawtransaction', hexlify(tx.raw).decode())
|
||||
|
@ -500,8 +695,6 @@ class HubNode:
|
|||
self.hostname = 'localhost'
|
||||
self.rpcport = 50051 # avoid conflict with default rpc port
|
||||
self.stopped = False
|
||||
self.restart_ready = asyncio.Event()
|
||||
self.restart_ready.set()
|
||||
self.running = asyncio.Event()
|
||||
|
||||
@property
|
||||
|
@ -558,7 +751,6 @@ class HubNode:
|
|||
if self.running.is_set():
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
await self.restart_ready.wait()
|
||||
try:
|
||||
if not self.debug:
|
||||
self.transport, self.protocol = await loop.subprocess_exec(
|
||||
|
|
|
@ -61,8 +61,10 @@ class ConductorService:
|
|||
#set_logging(
|
||||
# self.stack.ledger_module, logging.DEBUG, WebSocketLogHandler(self.send_message)
|
||||
#)
|
||||
self.stack.blockchain_started or await self.stack.start_blockchain()
|
||||
self.send_message({'type': 'service', 'name': 'blockchain', 'port': self.stack.blockchain_node.port})
|
||||
self.stack.lbcd_started or await self.stack.start_lbcd()
|
||||
self.send_message({'type': 'service', 'name': 'lbcd', 'port': self.stack.lbcd_node.port})
|
||||
self.stack.lbcwallet_started or await self.stack.start_lbcwallet()
|
||||
self.send_message({'type': 'service', 'name': 'lbcwallet', 'port': self.stack.lbcwallet_node.port})
|
||||
self.stack.spv_started or await self.stack.start_spv()
|
||||
self.send_message({'type': 'service', 'name': 'spv', 'port': self.stack.spv_node.port})
|
||||
self.stack.wallet_started or await self.stack.start_wallet()
|
||||
|
@ -74,7 +76,7 @@ class ConductorService:
|
|||
async def generate(self, request):
|
||||
data = await request.post()
|
||||
blocks = data.get('blocks', 1)
|
||||
await self.stack.blockchain_node.generate(int(blocks))
|
||||
await self.stack.lbcwallet_node.generate(int(blocks))
|
||||
return json_response({'blocks': blocks})
|
||||
|
||||
async def transfer(self, request):
|
||||
|
@ -85,11 +87,14 @@ class ConductorService:
|
|||
if not address:
|
||||
raise ValueError("No address was provided.")
|
||||
amount = data.get('amount', 1)
|
||||
txid = await self.stack.blockchain_node.send_to_address(address, amount)
|
||||
if self.stack.wallet_started:
|
||||
await self.stack.wallet_node.ledger.on_transaction.where(
|
||||
lambda e: e.tx.id == txid and e.address == address
|
||||
watcher = self.stack.wallet_node.ledger.on_transaction.where(
|
||||
lambda e: e.address == address # and e.tx.id == txid -- might stall; see send_to_address_and_wait
|
||||
)
|
||||
txid = await self.stack.lbcwallet_node.send_to_address(address, amount)
|
||||
await watcher
|
||||
else:
|
||||
txid = await self.stack.lbcwallet_node.send_to_address(address, amount)
|
||||
return json_response({
|
||||
'address': address,
|
||||
'amount': amount,
|
||||
|
@ -98,7 +103,7 @@ class ConductorService:
|
|||
|
||||
async def balance(self, _):
|
||||
return json_response({
|
||||
'balance': await self.stack.blockchain_node.get_balance()
|
||||
'balance': await self.stack.lbcwallet_node.get_balance()
|
||||
})
|
||||
|
||||
async def log(self, request):
|
||||
|
@ -129,7 +134,7 @@ class ConductorService:
|
|||
'type': 'status',
|
||||
'height': self.stack.wallet_node.ledger.headers.height,
|
||||
'balance': satoshis_to_coins(await self.stack.wallet_node.account.get_balance()),
|
||||
'miner': await self.stack.blockchain_node.get_balance()
|
||||
'miner': await self.stack.lbcwallet_node.get_balance()
|
||||
})
|
||||
|
||||
def send_message(self, msg):
|
||||
|
|
|
@ -53,7 +53,7 @@ class Daemon:
|
|||
self.max_retry = max_retry
|
||||
self._height = None
|
||||
self.available_rpcs = {}
|
||||
self.connector = aiohttp.TCPConnector()
|
||||
self.connector = aiohttp.TCPConnector(ssl=False)
|
||||
self._block_hash_cache = LRUCacheWithMetrics(100000)
|
||||
self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)
|
||||
|
||||
|
@ -250,14 +250,14 @@ class Daemon:
|
|||
async def deserialised_block(self, hex_hash):
|
||||
"""Return the deserialised block with the given hex hash."""
|
||||
if hex_hash not in self._block_cache:
|
||||
block = await self._send_single('getblock', (hex_hash, True))
|
||||
block = await self._send_single('getblock', (hex_hash, 1))
|
||||
self._block_cache[hex_hash] = block
|
||||
return block
|
||||
return self._block_cache[hex_hash]
|
||||
|
||||
async def raw_blocks(self, hex_hashes):
|
||||
"""Return the raw binary blocks with the given hex hashes."""
|
||||
params_iterable = ((h, False) for h in hex_hashes)
|
||||
params_iterable = ((h, 0) for h in hex_hashes)
|
||||
blocks = await self._send_vector('getblock', params_iterable)
|
||||
# Convert hex string to bytes
|
||||
return [hex_to_bytes(block) for block in blocks]
|
||||
|
@ -334,42 +334,7 @@ class LBCDaemon(Daemon):
|
|||
async def getrawtransaction(self, hex_hash, verbose=False):
|
||||
return await super().getrawtransaction(hex_hash=hex_hash, verbose=verbose)
|
||||
|
||||
@handles_errors
|
||||
async def getclaimbyid(self, claim_id):
|
||||
'''Given a claim id, retrieves claim information.'''
|
||||
return await self._send_single('getclaimbyid', (claim_id,))
|
||||
|
||||
@handles_errors
|
||||
async def getclaimsbyids(self, claim_ids):
|
||||
'''Given a list of claim ids, batches calls to retrieve claim information.'''
|
||||
return await self._send_vector('getclaimbyid', ((claim_id,) for claim_id in claim_ids))
|
||||
|
||||
@handles_errors
|
||||
async def getclaimsforname(self, name):
|
||||
'''Given a name, retrieves all claims matching that name.'''
|
||||
return await self._send_single('getclaimsforname', (name,))
|
||||
|
||||
@handles_errors
|
||||
async def getclaimsfortx(self, txid):
|
||||
'''Given a txid, returns the claims it make.'''
|
||||
return await self._send_single('getclaimsfortx', (txid,)) or []
|
||||
|
||||
@handles_errors
|
||||
async def getnameproof(self, name, block_hash=None):
|
||||
'''Given a name and optional block_hash, returns a name proof and winner, if any.'''
|
||||
return await self._send_single('getnameproof', (name, block_hash,) if block_hash else (name,))
|
||||
|
||||
@handles_errors
|
||||
async def getvalueforname(self, name):
|
||||
'''Given a name, returns the winning claim value.'''
|
||||
return await self._send_single('getvalueforname', (name,))
|
||||
|
||||
@handles_errors
|
||||
async def getnamesintrie(self):
|
||||
'''Given a name, returns the winning claim value.'''
|
||||
return await self._send_single('getnamesintrie')
|
||||
|
||||
@handles_errors
|
||||
async def claimname(self, name, hexvalue, amount):
|
||||
'''Claim a name, used for functional tests only.'''
|
||||
return await self._send_single('claimname', (name, hexvalue, float(amount)))
|
||||
|
|
|
@ -1,32 +1,130 @@
|
|||
import struct
|
||||
import rocksdb
|
||||
from typing import Optional
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
||||
|
||||
|
||||
class KeyValueStorage:
|
||||
class RocksDBStore:
|
||||
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
|
||||
# Use snappy compression (the default)
|
||||
self.path = path
|
||||
self._max_open_files = max_open_files
|
||||
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
|
||||
# self.multi_get = self.db.multi_get
|
||||
|
||||
def get_options(self):
|
||||
return rocksdb.Options(
|
||||
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
|
||||
max_open_files=self._max_open_files
|
||||
)
|
||||
|
||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
||||
raise NotImplemented()
|
||||
return self.db.get(key, fill_cache=fill_cache)
|
||||
|
||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
||||
include_key=True, include_value=True, fill_cache=True):
|
||||
raise NotImplemented()
|
||||
return RocksDBIterator(
|
||||
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
|
||||
prefix=prefix, include_key=include_key, include_value=include_value
|
||||
)
|
||||
|
||||
def write_batch(self, transaction: bool = False):
|
||||
raise NotImplemented()
|
||||
def write_batch(self, disable_wal: bool = False, sync: bool = False):
|
||||
return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
|
||||
|
||||
def close(self):
|
||||
raise NotImplemented()
|
||||
self.db.close()
|
||||
self.db = None
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
raise NotImplemented()
|
||||
return self.db is None
|
||||
|
||||
def try_catch_up_with_primary(self):
|
||||
self.db.try_catch_up_with_primary()
|
||||
|
||||
|
||||
class RocksDBWriteBatch:
|
||||
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
|
||||
self.batch = rocksdb.WriteBatch()
|
||||
self.db = db
|
||||
self.sync = sync
|
||||
self.disable_wal = disable_wal
|
||||
|
||||
def __enter__(self):
|
||||
return self.batch
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if not exc_val:
|
||||
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
|
||||
|
||||
|
||||
class RocksDBIterator:
|
||||
"""An iterator for RocksDB."""
|
||||
|
||||
__slots__ = [
|
||||
'start',
|
||||
'prefix',
|
||||
'stop',
|
||||
'iterator',
|
||||
'include_key',
|
||||
'include_value',
|
||||
'prev_k',
|
||||
'reverse',
|
||||
'include_start',
|
||||
'include_stop'
|
||||
]
|
||||
|
||||
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None,
|
||||
include_key: bool = True, include_value: bool = True, reverse: bool = False,
|
||||
include_start: bool = True, include_stop: bool = False):
|
||||
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
|
||||
self.start = start
|
||||
self.prefix = prefix
|
||||
self.stop = stop
|
||||
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
|
||||
if prefix is not None:
|
||||
self.iterator.seek(prefix)
|
||||
elif start is not None:
|
||||
self.iterator.seek(start)
|
||||
self.include_key = include_key
|
||||
self.include_value = include_value
|
||||
self.prev_k = None
|
||||
self.reverse = reverse
|
||||
self.include_start = include_start
|
||||
self.include_stop = include_stop
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def _check_stop_iteration(self, key: bytes):
|
||||
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
|
||||
raise StopIteration
|
||||
elif self.start is not None and self.start > key[:len(self.start)]:
|
||||
raise StopIteration
|
||||
elif self.prefix is not None and not key.startswith(self.prefix):
|
||||
raise StopIteration
|
||||
|
||||
def __next__(self):
|
||||
# TODO: include start/stop on/off
|
||||
# check for needing to stop from previous iteration
|
||||
if self.prev_k is not None:
|
||||
self._check_stop_iteration(self.prev_k)
|
||||
k, v = next(self.iterator)
|
||||
self._check_stop_iteration(k)
|
||||
self.prev_k = k
|
||||
|
||||
if self.include_key and self.include_value:
|
||||
return k, v
|
||||
elif self.include_key:
|
||||
return k
|
||||
return v
|
||||
|
||||
|
||||
class PrefixDB:
|
||||
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
|
||||
|
||||
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None):
|
||||
def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
|
||||
self._db = db
|
||||
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
|
||||
self._max_undo_depth = max_undo_depth
|
||||
|
@ -37,7 +135,7 @@ class PrefixDB:
|
|||
Changes written cannot be undone
|
||||
"""
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
with self._db.write_batch(sync=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
|
@ -61,7 +159,7 @@ class PrefixDB:
|
|||
include_value=False
|
||||
))
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
with self._db.write_batch(sync=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
|
@ -82,7 +180,7 @@ class PrefixDB:
|
|||
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
|
||||
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
with self._db.write_batch(sync=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
|
@ -108,6 +206,9 @@ class PrefixDB:
|
|||
if not self._db.closed:
|
||||
self._db.close()
|
||||
|
||||
def try_catch_up_with_primary(self):
|
||||
self._db.try_catch_up_with_primary()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._db.closed
|
||||
|
|
|
@ -4,7 +4,7 @@ import array
|
|||
import base64
|
||||
from typing import Union, Tuple, NamedTuple, Optional
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB
|
||||
from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
||||
from lbry.schema.url import normalize_name
|
||||
|
||||
|
@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType):
|
|||
value_struct: struct.Struct
|
||||
key_part_lambdas = []
|
||||
|
||||
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack):
|
||||
def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack):
|
||||
self._db = db
|
||||
self._op_stack = op_stack
|
||||
|
||||
|
@ -1595,40 +1595,10 @@ class BlockTxsPrefixRow(PrefixRow):
|
|||
return cls.pack_key(height), cls.pack_value(tx_hashes)
|
||||
|
||||
|
||||
class LevelDBStore(KeyValueStorage):
|
||||
def __init__(self, path: str, cache_mb: int, max_open_files: int):
|
||||
import plyvel
|
||||
self.db = plyvel.DB(
|
||||
path, create_if_missing=True, max_open_files=max_open_files,
|
||||
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
|
||||
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
|
||||
)
|
||||
|
||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
||||
return self.db.get(key, fill_cache=fill_cache)
|
||||
|
||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
||||
include_key=True, include_value=True, fill_cache=True):
|
||||
return self.db.iterator(
|
||||
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
|
||||
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
|
||||
)
|
||||
|
||||
def write_batch(self, transaction: bool = False, sync: bool = False):
|
||||
return self.db.write_batch(transaction=transaction, sync=sync)
|
||||
|
||||
def close(self):
|
||||
return self.db.close()
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
return self.db.closed
|
||||
|
||||
|
||||
class HubDB(PrefixDB):
|
||||
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
|
||||
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
||||
db = LevelDBStore(path, cache_mb, max_open_files)
|
||||
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
||||
db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path)
|
||||
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
|
||||
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
|
||||
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
|
||||
|
|
|
@ -26,7 +26,10 @@ class Server:
|
|||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||
|
||||
self.session_mgr = LBRYSessionManager(
|
||||
env, db, bp, daemon, self.shutdown_event
|
||||
env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
|
||||
self.shutdown_event,
|
||||
on_available_callback=bp.status_server.set_available,
|
||||
on_unavailable_callback=bp.status_server.set_unavailable
|
||||
)
|
||||
self._indexer_task = None
|
||||
|
||||
|
|
|
@ -170,13 +170,16 @@ class SessionManager:
|
|||
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||
)
|
||||
|
||||
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event):
|
||||
def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
|
||||
daemon: 'Daemon', shutdown_event: asyncio.Event,
|
||||
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||
env.max_send = max(350000, env.max_send)
|
||||
self.env = env
|
||||
self.db = db
|
||||
self.bp = bp
|
||||
self.on_available_callback = on_available_callback
|
||||
self.on_unavailable_callback = on_unavailable_callback
|
||||
self.daemon = daemon
|
||||
self.mempool = bp.mempool
|
||||
self.mempool = mempool
|
||||
self.shutdown_event = shutdown_event
|
||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
||||
|
@ -186,7 +189,9 @@ class SessionManager:
|
|||
self.cur_group = SessionGroup(0)
|
||||
self.txs_sent = 0
|
||||
self.start_time = time.time()
|
||||
self.history_cache = self.bp.history_cache
|
||||
self.history_cache = history_cache
|
||||
self.resolve_cache = resolve_cache
|
||||
self.resolve_outputs_cache = resolve_outputs_cache
|
||||
self.notified_height: typing.Optional[int] = None
|
||||
# Cache some idea of room to avoid recounting on each subscription
|
||||
self.subs_room = 0
|
||||
|
@ -243,7 +248,7 @@ class SessionManager:
|
|||
await self.session_event.wait()
|
||||
self.session_event.clear()
|
||||
if not paused and len(self.sessions) >= max_sessions:
|
||||
self.bp.status_server.set_unavailable()
|
||||
self.on_unavailable_callback()
|
||||
self.logger.info(f'maximum sessions {max_sessions:,d} '
|
||||
f'reached, stopping new connections until '
|
||||
f'count drops to {low_watermark:,d}')
|
||||
|
@ -252,7 +257,7 @@ class SessionManager:
|
|||
# Start listening for incoming connections if paused and
|
||||
# session count has fallen
|
||||
if paused and len(self.sessions) <= low_watermark:
|
||||
self.bp.status_server.set_available()
|
||||
self.on_available_callback()
|
||||
self.logger.info('resuming listening for incoming connections')
|
||||
await self._start_external_servers()
|
||||
paused = False
|
||||
|
@ -533,7 +538,7 @@ class SessionManager:
|
|||
await self.start_other()
|
||||
await self._start_external_servers()
|
||||
server_listening_event.set()
|
||||
self.bp.status_server.set_available()
|
||||
self.on_available_callback()
|
||||
# Peer discovery should start after the external servers
|
||||
# because we connect to ourself
|
||||
await asyncio.wait([
|
||||
|
@ -628,8 +633,9 @@ class SessionManager:
|
|||
for hashX in touched.intersection(self.mempool_statuses.keys()):
|
||||
self.mempool_statuses.pop(hashX, None)
|
||||
|
||||
# self.bp._chain_executor
|
||||
await asyncio.get_event_loop().run_in_executor(
|
||||
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||
)
|
||||
|
||||
if touched or new_touched or (height_changed and self.mempool_statuses):
|
||||
|
@ -866,8 +872,7 @@ class LBRYElectrumX(SessionBase):
|
|||
self.protocol_tuple = self.PROTOCOL_MIN
|
||||
self.protocol_string = None
|
||||
self.daemon = self.session_mgr.daemon
|
||||
self.bp: BlockProcessor = self.session_mgr.bp
|
||||
self.db: LevelDB = self.bp.db
|
||||
self.db: LevelDB = self.session_mgr.db
|
||||
|
||||
@classmethod
|
||||
def protocol_min_max_strings(cls):
|
||||
|
@ -1008,21 +1013,21 @@ class LBRYElectrumX(SessionBase):
|
|||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
async def _cached_resolve_url(self, url):
|
||||
if url not in self.bp.resolve_cache:
|
||||
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.bp.resolve_cache[url]
|
||||
if url not in self.session_mgr.resolve_cache:
|
||||
self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.session_mgr.resolve_cache[url]
|
||||
|
||||
async def claimtrie_resolve(self, *urls) -> str:
|
||||
sorted_urls = tuple(sorted(urls))
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||
try:
|
||||
if sorted_urls in self.bp.resolve_outputs_cache:
|
||||
return self.bp.resolve_outputs_cache[sorted_urls]
|
||||
if sorted_urls in self.session_mgr.resolve_outputs_cache:
|
||||
return self.session_mgr.resolve_outputs_cache[sorted_urls]
|
||||
rows, extra = [], []
|
||||
for url in urls:
|
||||
if url not in self.bp.resolve_cache:
|
||||
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
|
||||
if url not in self.session_mgr.resolve_cache:
|
||||
self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||
stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
|
||||
if isinstance(channel, ResolveCensoredError):
|
||||
rows.append(channel)
|
||||
extra.append(channel.censor_row)
|
||||
|
@ -1047,7 +1052,7 @@ class LBRYElectrumX(SessionBase):
|
|||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
await asyncio.sleep(0)
|
||||
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||
self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||
None, Outputs.to_base64, rows, extra, 0, None, None
|
||||
)
|
||||
return result
|
||||
|
@ -1055,7 +1060,7 @@ class LBRYElectrumX(SessionBase):
|
|||
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
||||
|
||||
async def get_server_height(self):
|
||||
return self.bp.height
|
||||
return self.db.db_height
|
||||
|
||||
async def transaction_get_height(self, tx_hash):
|
||||
self.assert_tx_hash(tx_hash)
|
||||
|
@ -1467,7 +1472,7 @@ class LBRYElectrumX(SessionBase):
|
|||
if mempool_tx:
|
||||
raw_tx, block_hash = mempool_tx.raw_tx.hex(), None
|
||||
else:
|
||||
tx_info = await self.daemon_request('getrawtransaction', tx_hash, True)
|
||||
tx_info = await self.daemon_request('getrawtransaction', tx_hash, 1)
|
||||
raw_tx = tx_info['hex']
|
||||
block_hash = tx_info.get('blockhash')
|
||||
if block_hash:
|
||||
|
@ -1504,7 +1509,7 @@ class LBRYElectrumX(SessionBase):
|
|||
if verbose not in (True, False):
|
||||
raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean')
|
||||
|
||||
return await self.daemon_request('getrawtransaction', tx_hash, verbose)
|
||||
return await self.daemon_request('getrawtransaction', tx_hash, int(verbose))
|
||||
|
||||
def _get_merkle_branch(self, tx_hashes, tx_pos):
|
||||
"""Return a merkle branch to a transaction.
|
||||
|
|
|
@ -40,22 +40,17 @@ def checkrecord(record, expected_winner, expected_claim):
|
|||
|
||||
|
||||
async def checkcontrolling(daemon: Daemon, db: SQLDB):
|
||||
records, claim_ids, names, futs = [], [], [], []
|
||||
records, names, futs = [], [], []
|
||||
for record in db.get_claims('claimtrie.claim_hash as is_controlling, claim.*', is_controlling=True):
|
||||
records.append(record)
|
||||
claim_id = hex_reverted(record['claim_hash'])
|
||||
claim_ids.append((claim_id,))
|
||||
names.append((record['normalized'],))
|
||||
names.append((record['normalized'], (claim_id,), "", True)) # last parameter is IncludeValues
|
||||
if len(names) > 50000:
|
||||
futs.append(daemon._send_vector('getvalueforname', names[:]))
|
||||
futs.append(daemon._send_vector('getclaimbyid', claim_ids[:]))
|
||||
futs.append(daemon._send_vector('getclaimsfornamebyid', names))
|
||||
names.clear()
|
||||
claim_ids.clear()
|
||||
if names:
|
||||
futs.append(daemon._send_vector('getvalueforname', names[:]))
|
||||
futs.append(daemon._send_vector('getclaimbyid', claim_ids[:]))
|
||||
futs.append(daemon._send_vector('getclaimsfornamebyid', names))
|
||||
names.clear()
|
||||
claim_ids.clear()
|
||||
|
||||
while futs:
|
||||
winners, claims = futs.pop(0), futs.pop(0)
|
||||
|
|
10
setup.py
10
setup.py
|
@ -7,9 +7,11 @@ BASE = os.path.dirname(__file__)
|
|||
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
|
||||
long_description = fh.read()
|
||||
|
||||
PLYVEL = []
|
||||
if sys.platform.startswith('linux'):
|
||||
PLYVEL.append('plyvel==1.3.0')
|
||||
|
||||
ROCKSDB = []
|
||||
if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
|
||||
ROCKSDB.append('lbry-rocksdb==0.8.1')
|
||||
|
||||
|
||||
setup(
|
||||
name=__name__,
|
||||
|
@ -57,7 +59,7 @@ setup(
|
|||
'pylru==1.1.0',
|
||||
'elasticsearch==7.10.1',
|
||||
'grpcio==1.38.0'
|
||||
] + PLYVEL,
|
||||
] + ROCKSDB,
|
||||
extras_require={
|
||||
'torrent': ['lbry-libtorrent'],
|
||||
'lint': ['pylint==2.10.0'],
|
||||
|
|
47
test_rocksdb.py
Executable file
47
test_rocksdb.py
Executable file
|
@ -0,0 +1,47 @@
|
|||
#! python
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import rocksdb
|
||||
import tempfile
|
||||
import logging
|
||||
|
||||
log = logging.getLogger()
|
||||
log.addHandler(logging.StreamHandler())
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
def _main(db_loc):
|
||||
opts = rocksdb.Options(create_if_missing=True)
|
||||
db = rocksdb.DB(os.path.join(db_loc, "test"), opts)
|
||||
secondary_location = os.path.join(db_loc, "secondary")
|
||||
secondary = rocksdb.DB(
|
||||
os.path.join(db_loc, "test"),
|
||||
rocksdb.Options(create_if_missing=True, max_open_files=-1),
|
||||
secondary_name=secondary_location
|
||||
)
|
||||
try:
|
||||
assert secondary.get(b"a") is None
|
||||
db.put(b"a", b"b")
|
||||
assert db.get(b"a") == b"b"
|
||||
assert secondary.get(b"a") is None
|
||||
|
||||
secondary.try_catch_up_with_primary()
|
||||
assert secondary.get(b"a") == b"b"
|
||||
finally:
|
||||
secondary.close()
|
||||
db.close()
|
||||
|
||||
|
||||
def main():
|
||||
db_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
_main(db_dir)
|
||||
log.info("rocksdb %s (%s) works!", rocksdb.__version__, rocksdb.ROCKSDB_VERSION)
|
||||
except:
|
||||
log.exception("boom")
|
||||
finally:
|
||||
shutil.rmtree(db_dir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -103,7 +103,7 @@ class AccountManagement(CommandTestCase):
|
|||
second_account = await self.daemon.jsonrpc_account_create('second account')
|
||||
|
||||
tx = await self.daemon.jsonrpc_account_send(
|
||||
'0.05', await self.daemon.jsonrpc_address_unused(account_id=second_account.id)
|
||||
'0.05', await self.daemon.jsonrpc_address_unused(account_id=second_account.id), blocking=True
|
||||
)
|
||||
await self.confirm_tx(tx.id)
|
||||
await self.assertOutputAmount(['0.05', '9.949876'], utxo_list())
|
||||
|
|
|
@ -112,7 +112,7 @@ class BlockchainReorganizationTests(CommandTestCase):
|
|||
|
||||
# reorg the last block dropping our claim tx
|
||||
await self.blockchain.invalidate_block(invalidated_block_hash)
|
||||
await self.blockchain.clear_mempool()
|
||||
await self.conductor.clear_mempool()
|
||||
await self.blockchain.generate(2)
|
||||
|
||||
# wait for the client to catch up and verify the reorg
|
||||
|
@ -191,7 +191,7 @@ class BlockchainReorganizationTests(CommandTestCase):
|
|||
|
||||
# reorg the last block dropping our claim tx
|
||||
await self.blockchain.invalidate_block(invalidated_block_hash)
|
||||
await self.blockchain.clear_mempool()
|
||||
await self.conductor.clear_mempool()
|
||||
await self.blockchain.generate(2)
|
||||
|
||||
# wait for the client to catch up and verify the reorg
|
||||
|
|
|
@ -101,12 +101,7 @@ class ReconnectTests(IntegrationTestCase):
|
|||
self.ledger.network.client.transport.close()
|
||||
self.assertFalse(self.ledger.network.is_connected)
|
||||
await self.ledger.resolve([], 'derp')
|
||||
sendtxid = await self.blockchain.send_to_address(address1, 1.1337)
|
||||
# await self.ledger.resolve([], 'derp')
|
||||
# self.assertTrue(self.ledger.network.is_connected)
|
||||
await asyncio.wait_for(self.on_transaction_id(sendtxid), 10.0) # mempool
|
||||
await self.blockchain.generate(1)
|
||||
await self.on_transaction_id(sendtxid) # confirmed
|
||||
sendtxid = await self.send_to_address_and_wait(address1, 1.1337, 1)
|
||||
self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine
|
||||
|
||||
await self.assertBalance(self.account, '1.1337')
|
||||
|
@ -135,7 +130,7 @@ class ReconnectTests(IntegrationTestCase):
|
|||
await self.conductor.spv_node.stop()
|
||||
self.assertFalse(self.ledger.network.is_connected)
|
||||
await asyncio.sleep(0.2) # let it retry and fail once
|
||||
await self.conductor.spv_node.start(self.conductor.blockchain_node)
|
||||
await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
|
||||
await self.ledger.network.on_connected.first
|
||||
self.assertTrue(self.ledger.network.is_connected)
|
||||
|
||||
|
@ -165,8 +160,10 @@ class UDPServerFailDiscoveryTest(AsyncioTestCase):
|
|||
async def test_wallet_connects_despite_lack_of_udp(self):
|
||||
conductor = Conductor()
|
||||
conductor.spv_node.udp_port = '0'
|
||||
await conductor.start_blockchain()
|
||||
self.addCleanup(conductor.stop_blockchain)
|
||||
await conductor.start_lbcd()
|
||||
self.addCleanup(conductor.stop_lbcd)
|
||||
await conductor.start_lbcwallet()
|
||||
self.addCleanup(conductor.stop_lbcwallet)
|
||||
await conductor.start_spv()
|
||||
self.addCleanup(conductor.stop_spv)
|
||||
self.assertFalse(conductor.spv_node.server.bp.status_server.is_running)
|
||||
|
|
|
@ -174,8 +174,7 @@ class PurchaseCommandTests(CommandTestCase):
|
|||
self.merchant_address = await self.account.receiving.get_or_create_usable_address()
|
||||
daemon2 = await self.add_daemon()
|
||||
address2 = await daemon2.wallet_manager.default_account.receiving.get_or_create_usable_address()
|
||||
sendtxid = await self.blockchain.send_to_address(address2, 2)
|
||||
await self.confirm_tx(sendtxid, daemon2.ledger)
|
||||
await self.send_to_address_and_wait(address2, 2, 1, ledger=daemon2.ledger)
|
||||
|
||||
stream = await self.priced_stream('a', '1.0')
|
||||
await self.assertBalance(self.account, '9.987893')
|
||||
|
|
|
@ -23,7 +23,7 @@ class WalletCommands(CommandTestCase):
|
|||
async def test_wallet_syncing_status(self):
|
||||
address = await self.daemon.jsonrpc_address_unused()
|
||||
self.assertFalse(self.daemon.jsonrpc_wallet_status()['is_syncing'])
|
||||
await self.blockchain.send_to_address(address, 1)
|
||||
await self.send_to_address_and_wait(address, 1)
|
||||
await self.ledger._update_tasks.started.wait()
|
||||
self.assertTrue(self.daemon.jsonrpc_wallet_status()['is_syncing'])
|
||||
await self.ledger._update_tasks.done.wait()
|
||||
|
@ -49,7 +49,7 @@ class WalletCommands(CommandTestCase):
|
|||
self.assertEqual(status['wallet']['servers'][0]['port'], 50002)
|
||||
await self.conductor.spv_node.stop(True)
|
||||
self.conductor.spv_node.port = 54320
|
||||
await self.conductor.spv_node.start(self.conductor.blockchain_node)
|
||||
await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
|
||||
status = await self.daemon.jsonrpc_status()
|
||||
self.assertEqual(len(status['wallet']['servers']), 0)
|
||||
self.daemon.jsonrpc_settings_set('lbryum_servers', ['localhost:54320'])
|
||||
|
@ -59,23 +59,22 @@ class WalletCommands(CommandTestCase):
|
|||
self.assertEqual(status['wallet']['servers'][0]['port'], 54320)
|
||||
|
||||
async def test_sending_to_scripthash_address(self):
|
||||
self.assertEqual(await self.blockchain.get_balance(), '95.99973580')
|
||||
bal = await self.blockchain.get_balance()
|
||||
await self.assertBalance(self.account, '10.0')
|
||||
p2sh_address1 = await self.blockchain.get_new_address(self.blockchain.P2SH_SEGWIT_ADDRESS)
|
||||
tx = await self.account_send('2.0', p2sh_address1)
|
||||
self.assertEqual(tx['outputs'][0]['address'], p2sh_address1)
|
||||
self.assertEqual(await self.blockchain.get_balance(), '98.99973580') # +1 lbc for confirm block
|
||||
self.assertEqual(await self.blockchain.get_balance(), str(float(bal)+3)) # +1 lbc for confirm block
|
||||
await self.assertBalance(self.account, '7.999877')
|
||||
await self.wallet_send('3.0', p2sh_address1)
|
||||
self.assertEqual(await self.blockchain.get_balance(), '102.99973580') # +1 lbc for confirm block
|
||||
self.assertEqual(await self.blockchain.get_balance(), str(float(bal)+7)) # +1 lbc for confirm block
|
||||
await self.assertBalance(self.account, '4.999754')
|
||||
|
||||
async def test_balance_caching(self):
|
||||
account2 = await self.daemon.jsonrpc_account_create("Tip-er")
|
||||
address2 = await self.daemon.jsonrpc_address_unused(account2.id)
|
||||
sendtxid = await self.blockchain.send_to_address(address2, 10)
|
||||
await self.confirm_tx(sendtxid)
|
||||
await self.generate(1)
|
||||
await self.send_to_address_and_wait(address2, 10, 2)
|
||||
await self.ledger.tasks_are_done() # don't mess with the query count while we need it
|
||||
|
||||
wallet_balance = self.daemon.jsonrpc_wallet_balance
|
||||
ledger = self.ledger
|
||||
|
@ -90,14 +89,16 @@ class WalletCommands(CommandTestCase):
|
|||
self.assertIsNone(ledger._balance_cache.get(self.account.id))
|
||||
|
||||
query_count += 2
|
||||
self.assertEqual(await wallet_balance(), expected)
|
||||
balance = await wallet_balance()
|
||||
self.assertEqual(self.ledger.db.db.query_count, query_count)
|
||||
self.assertEqual(balance, expected)
|
||||
self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(self.account.id))['total'], '10.0')
|
||||
self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(account2.id))['total'], '10.0')
|
||||
|
||||
# calling again uses cache
|
||||
self.assertEqual(await wallet_balance(), expected)
|
||||
balance = await wallet_balance()
|
||||
self.assertEqual(self.ledger.db.db.query_count, query_count)
|
||||
self.assertEqual(balance, expected)
|
||||
self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(self.account.id))['total'], '10.0')
|
||||
self.assertEqual(dict_values_to_lbc(ledger._balance_cache.get(account2.id))['total'], '10.0')
|
||||
|
||||
|
@ -123,8 +124,7 @@ class WalletCommands(CommandTestCase):
|
|||
wallet2 = await self.daemon.jsonrpc_wallet_create('foo', create_account=True)
|
||||
account3 = wallet2.default_account
|
||||
address3 = await self.daemon.jsonrpc_address_unused(account3.id, wallet2.id)
|
||||
await self.confirm_tx(await self.blockchain.send_to_address(address3, 1))
|
||||
await self.generate(1)
|
||||
await self.send_to_address_and_wait(address3, 1, 1)
|
||||
|
||||
account_balance = self.daemon.jsonrpc_account_balance
|
||||
wallet_balance = self.daemon.jsonrpc_wallet_balance
|
||||
|
@ -154,7 +154,7 @@ class WalletCommands(CommandTestCase):
|
|||
address2 = await self.daemon.jsonrpc_address_unused(account2.id)
|
||||
|
||||
# send lbc to someone else
|
||||
tx = await self.daemon.jsonrpc_account_send('1.0', address2)
|
||||
tx = await self.daemon.jsonrpc_account_send('1.0', address2, blocking=True)
|
||||
await self.confirm_tx(tx.id)
|
||||
self.assertEqual(await account_balance(), {
|
||||
'total': '8.97741',
|
||||
|
@ -187,7 +187,7 @@ class WalletCommands(CommandTestCase):
|
|||
})
|
||||
|
||||
# tip claimed
|
||||
tx = await self.daemon.jsonrpc_support_abandon(txid=support1['txid'], nout=0)
|
||||
tx = await self.daemon.jsonrpc_support_abandon(txid=support1['txid'], nout=0, blocking=True)
|
||||
await self.confirm_tx(tx.id)
|
||||
self.assertEqual(await account_balance(), {
|
||||
'total': '9.277303',
|
||||
|
@ -238,8 +238,7 @@ class WalletEncryptionAndSynchronization(CommandTestCase):
|
|||
"carbon smart garage balance margin twelve"
|
||||
)
|
||||
address = (await self.daemon2.wallet_manager.default_account.receiving.get_addresses(limit=1, only_usable=True))[0]
|
||||
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||
await self.confirm_tx(sendtxid, self.daemon2.ledger)
|
||||
await self.send_to_address_and_wait(address, 1, 1, ledger=self.daemon2.ledger)
|
||||
|
||||
def assertWalletEncrypted(self, wallet_path, encrypted):
|
||||
with open(wallet_path) as opened:
|
||||
|
@ -294,7 +293,7 @@ class WalletEncryptionAndSynchronization(CommandTestCase):
|
|||
'3056301006072a8648ce3d020106052b8104000a034200049ae7283f3f6723e0a1'
|
||||
'66b7e19e1d1167f6dc5f4af61b4a58066a0d2a8bed2b35c66bccb4ec3eba316b16'
|
||||
'a97a6d6a4a8effd29d748901bb9789352519cd00b13d'
|
||||
), self.daemon2)
|
||||
), self.daemon2, blocking=True)
|
||||
await self.confirm_tx(channel['txid'], self.daemon2.ledger)
|
||||
|
||||
# both daemons will have the channel but only one has the cert so far
|
||||
|
|
|
@ -494,8 +494,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
|||
tx = await Transaction.claim_create(
|
||||
'unknown', b'{"sources":{"lbry_sd_hash":""}}', 1, address, [self.account], self.account)
|
||||
await tx.sign([self.account])
|
||||
await self.broadcast(tx)
|
||||
await self.confirm_tx(tx.id)
|
||||
await self.broadcast_and_confirm(tx)
|
||||
|
||||
octet = await self.stream_create()
|
||||
video = await self.stream_create('chrome', file_path=self.video_file_name)
|
||||
|
@ -1226,7 +1225,7 @@ class ChannelCommands(CommandTestCase):
|
|||
data_to_sign = "CAFEBABE"
|
||||
# claim new name
|
||||
await self.channel_create('@someotherchan')
|
||||
channel_tx = await self.daemon.jsonrpc_channel_create('@signer', '0.1')
|
||||
channel_tx = await self.daemon.jsonrpc_channel_create('@signer', '0.1', blocking=True)
|
||||
await self.confirm_tx(channel_tx.id)
|
||||
channel = channel_tx.outputs[0]
|
||||
signature1 = await self.out(self.daemon.jsonrpc_channel_sign(channel_name='@signer', hexdata=data_to_sign))
|
||||
|
@ -1373,7 +1372,7 @@ class StreamCommands(ClaimTestCase):
|
|||
self.assertEqual('8.989893', (await self.daemon.jsonrpc_account_balance())['available'])
|
||||
|
||||
result = await self.out(self.daemon.jsonrpc_account_send(
|
||||
'5.0', await self.daemon.jsonrpc_address_unused(account2_id)
|
||||
'5.0', await self.daemon.jsonrpc_address_unused(account2_id), blocking=True
|
||||
))
|
||||
await self.confirm_tx(result['txid'])
|
||||
|
||||
|
@ -1555,7 +1554,7 @@ class StreamCommands(ClaimTestCase):
|
|||
)
|
||||
# test setting from env vars and starting from scratch
|
||||
await self.conductor.spv_node.stop(False)
|
||||
await self.conductor.spv_node.start(self.conductor.blockchain_node,
|
||||
await self.conductor.spv_node.start(self.conductor.lbcwallet_node,
|
||||
extraconf={'BLOCKING_CHANNEL_IDS': blocking_channel_id,
|
||||
'FILTERING_CHANNEL_IDS': filtering_channel_id})
|
||||
await self.daemon.wallet_manager.reset()
|
||||
|
@ -2172,7 +2171,7 @@ class SupportCommands(CommandTestCase):
|
|||
tip = await self.out(
|
||||
self.daemon.jsonrpc_support_create(
|
||||
claim_id, '1.0', True, account_id=account2.id, wallet_id='wallet2',
|
||||
funding_account_ids=[account2.id])
|
||||
funding_account_ids=[account2.id], blocking=True)
|
||||
)
|
||||
await self.confirm_tx(tip['txid'])
|
||||
|
||||
|
@ -2204,7 +2203,7 @@ class SupportCommands(CommandTestCase):
|
|||
support = await self.out(
|
||||
self.daemon.jsonrpc_support_create(
|
||||
claim_id, '2.0', False, account_id=account2.id, wallet_id='wallet2',
|
||||
funding_account_ids=[account2.id])
|
||||
funding_account_ids=[account2.id], blocking=True)
|
||||
)
|
||||
await self.confirm_tx(support['txid'])
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import unittest
|
||||
from unittest import skipIf
|
||||
import asyncio
|
||||
import os
|
||||
|
@ -36,8 +37,7 @@ class FileCommands(CommandTestCase):
|
|||
tx_to_update.outputs[0], claim, 1, address, [self.account], self.account
|
||||
)
|
||||
await tx.sign([self.account])
|
||||
await self.broadcast(tx)
|
||||
await self.confirm_tx(tx.id)
|
||||
await self.broadcast_and_confirm(tx)
|
||||
self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session
|
||||
self.client_session._session.add_dht_node(('localhost', 4040))
|
||||
self.client_session.wait_start = False # fixme: this is super slow on tests
|
||||
|
@ -222,6 +222,7 @@ class FileCommands(CommandTestCase):
|
|||
await self.wait_files_to_complete()
|
||||
self.assertNotEqual(first_path, second_path)
|
||||
|
||||
@unittest.SkipTest # FIXME: claimname/updateclaim is gone. #3480 wip, unblock #3479"
|
||||
async def test_file_list_updated_metadata_on_resolve(self):
|
||||
await self.stream_create('foo', '0.01')
|
||||
txo = (await self.daemon.resolve(self.wallet.accounts, ['lbry://foo']))['lbry://foo']
|
||||
|
@ -510,8 +511,7 @@ class FileCommands(CommandTestCase):
|
|||
tx.outputs[0].claim.stream.fee.address_bytes = b''
|
||||
tx.outputs[0].script.generate()
|
||||
await tx.sign([self.account])
|
||||
await self.broadcast(tx)
|
||||
await self.confirm_tx(tx.id)
|
||||
await self.broadcast_and_confirm(tx)
|
||||
|
||||
async def __raw_value_update_no_fee_amount(self, tx, claim_address):
|
||||
tx = await self.daemon.jsonrpc_stream_update(
|
||||
|
@ -521,8 +521,7 @@ class FileCommands(CommandTestCase):
|
|||
tx.outputs[0].claim.stream.fee.message.ClearField('amount')
|
||||
tx.outputs[0].script.generate()
|
||||
await tx.sign([self.account])
|
||||
await self.broadcast(tx)
|
||||
await self.confirm_tx(tx.id)
|
||||
await self.broadcast_and_confirm(tx)
|
||||
|
||||
|
||||
class DiskSpaceManagement(CommandTestCase):
|
||||
|
|
|
@ -80,7 +80,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
|
|||
|
||||
# After some soul searching Chris decides that his story needs more
|
||||
# heart and a better ending. He takes down the story and begins the rewrite.
|
||||
abandon = await self.out(self.daemon.jsonrpc_stream_abandon(claim_id, blocking=False))
|
||||
abandon = await self.out(self.daemon.jsonrpc_stream_abandon(claim_id, blocking=True))
|
||||
self.assertEqual(abandon['inputs'][0]['claim_id'], claim_id)
|
||||
await self.confirm_tx(abandon['txid'])
|
||||
|
||||
|
@ -103,7 +103,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
|
|||
# 1 LBC to which Chris readily obliges
|
||||
ramsey_account_id = (await self.out(self.daemon.jsonrpc_account_create("Ramsey")))['id']
|
||||
ramsey_address = await self.daemon.jsonrpc_address_unused(ramsey_account_id)
|
||||
result = await self.out(self.daemon.jsonrpc_account_send('1.0', ramsey_address))
|
||||
result = await self.out(self.daemon.jsonrpc_account_send('1.0', ramsey_address, blocking=True))
|
||||
self.assertIn("txid", result)
|
||||
await self.confirm_tx(result['txid'])
|
||||
|
||||
|
@ -133,7 +133,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
|
|||
# And voila, and bravo and encore! His Best Friend Ramsey read the story and immediately knew this was a hit
|
||||
# Now to keep this claim winning on the lbry blockchain he immediately supports the claim
|
||||
tx = await self.out(self.daemon.jsonrpc_support_create(
|
||||
claim_id2, '0.2', account_id=ramsey_account_id
|
||||
claim_id2, '0.2', account_id=ramsey_account_id, blocking=True
|
||||
))
|
||||
await self.confirm_tx(tx['txid'])
|
||||
|
||||
|
@ -147,7 +147,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
|
|||
# Now he also wanted to support the original creator of the Award Winning Novel
|
||||
# So he quickly decides to send a tip to him
|
||||
tx = await self.out(
|
||||
self.daemon.jsonrpc_support_create(claim_id2, '0.3', tip=True, account_id=ramsey_account_id)
|
||||
self.daemon.jsonrpc_support_create(claim_id2, '0.3', tip=True, account_id=ramsey_account_id, blocking=True)
|
||||
)
|
||||
await self.confirm_tx(tx['txid'])
|
||||
|
||||
|
@ -158,7 +158,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
|
|||
await self.generate(5)
|
||||
|
||||
# Seeing the ravishing success of his novel Chris adds support to his claim too
|
||||
tx = await self.out(self.daemon.jsonrpc_support_create(claim_id2, '0.4'))
|
||||
tx = await self.out(self.daemon.jsonrpc_support_create(claim_id2, '0.4', blocking=True))
|
||||
await self.confirm_tx(tx['txid'])
|
||||
|
||||
# And check if his support showed up
|
||||
|
@ -183,7 +183,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
|
|||
|
||||
# But sadly Ramsey wasn't so pleased. It was hard for him to tell Chris...
|
||||
# Chris, though a bit heartbroken, abandoned the claim for now, but instantly started working on new hit lyrics
|
||||
abandon = await self.out(self.daemon.jsonrpc_stream_abandon(txid=tx['txid'], nout=0, blocking=False))
|
||||
abandon = await self.out(self.daemon.jsonrpc_stream_abandon(txid=tx['txid'], nout=0, blocking=True))
|
||||
self.assertTrue(abandon['inputs'][0]['txid'], tx['txid'])
|
||||
await self.confirm_tx(abandon['txid'])
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import asyncio
|
||||
import json
|
||||
import hashlib
|
||||
import sys
|
||||
from bisect import bisect_right
|
||||
from binascii import hexlify, unhexlify
|
||||
from collections import defaultdict
|
||||
|
@ -31,13 +32,13 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
self.assertEqual(claim_from_es['effective_amount'], claim_from_db.effective_amount)
|
||||
|
||||
def assertMatchDBClaim(self, expected, claim):
|
||||
self.assertEqual(expected['claimId'], claim.claim_hash.hex())
|
||||
self.assertEqual(expected['validAtHeight'], claim.activation_height)
|
||||
self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height)
|
||||
self.assertEqual(expected['txId'], claim.tx_hash[::-1].hex())
|
||||
self.assertEqual(expected['claimid'], claim.claim_hash.hex())
|
||||
self.assertEqual(expected['validatheight'], claim.activation_height)
|
||||
self.assertEqual(expected['lasttakeoverheight'], claim.last_takeover_height)
|
||||
self.assertEqual(expected['txid'], claim.tx_hash[::-1].hex())
|
||||
self.assertEqual(expected['n'], claim.position)
|
||||
self.assertEqual(expected['amount'], claim.amount)
|
||||
self.assertEqual(expected['effectiveAmount'], claim.effective_amount)
|
||||
self.assertEqual(expected['effectiveamount'], claim.effective_amount)
|
||||
|
||||
async def assertResolvesToClaimId(self, name, claim_id):
|
||||
other = await self.resolve(name)
|
||||
|
@ -53,9 +54,10 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex())
|
||||
|
||||
async def assertNoClaimForName(self, name: str):
|
||||
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
|
||||
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name))
|
||||
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
|
||||
self.assertNotIn('claimId', lbrycrd_winning)
|
||||
if 'claims' in lbrycrd_winning and lbrycrd_winning['claims'] is not None:
|
||||
self.assertEqual(len(lbrycrd_winning['claims']), 0)
|
||||
if stream is not None:
|
||||
self.assertIsInstance(stream, LookupError)
|
||||
else:
|
||||
|
@ -63,20 +65,23 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name)
|
||||
self.assertListEqual([], claim_from_es[0])
|
||||
|
||||
async def assertNoClaim(self, claim_id: str):
|
||||
self.assertDictEqual(
|
||||
{}, json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
|
||||
)
|
||||
async def assertNoClaim(self, name: str, claim_id: str):
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebyid', name, '["' + claim_id + '"]'))
|
||||
if 'claims' in expected and expected['claims'] is not None:
|
||||
# ensure that if we do have the matching claim that it is not active
|
||||
self.assertEqual(expected['claims'][0]['effectiveamount'], 0)
|
||||
|
||||
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id)
|
||||
self.assertListEqual([], claim_from_es[0])
|
||||
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
|
||||
self.assertIsNone(claim)
|
||||
|
||||
async def assertMatchWinningClaim(self, name):
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebybid', name, "[0]"))
|
||||
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
|
||||
claim = stream if stream else channel
|
||||
await self._assertMatchClaim(expected, claim)
|
||||
expected['claims'][0]['lasttakeoverheight'] = expected['lasttakeoverheight']
|
||||
await self._assertMatchClaim(expected['claims'][0], claim)
|
||||
return claim
|
||||
|
||||
async def _assertMatchClaim(self, expected, claim):
|
||||
|
@ -86,70 +91,81 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
)
|
||||
self.assertEqual(len(claim_from_es[0]), 1)
|
||||
self.assertMatchESClaim(claim_from_es[0][0], claim)
|
||||
self._check_supports(claim.claim_hash.hex(), expected['supports'], claim_from_es[0][0]['support_amount'])
|
||||
self._check_supports(claim.claim_hash.hex(), expected.get('supports', []),
|
||||
claim_from_es[0][0]['support_amount'])
|
||||
|
||||
async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True):
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
|
||||
async def assertMatchClaim(self, name, claim_id, is_active_in_lbrycrd=True):
|
||||
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
|
||||
if is_active_in_lbrycrd:
|
||||
if not expected:
|
||||
self.assertIsNone(claim)
|
||||
return
|
||||
self.assertMatchDBClaim(expected, claim)
|
||||
else:
|
||||
self.assertDictEqual({}, expected)
|
||||
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
|
||||
claim_id=claim.claim_hash.hex()
|
||||
)
|
||||
self.assertEqual(len(claim_from_es[0]), 1)
|
||||
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex())
|
||||
self.assertMatchESClaim(claim_from_es[0][0], claim)
|
||||
self._check_supports(
|
||||
claim.claim_hash.hex(), expected.get('supports', []), claim_from_es[0][0]['support_amount'],
|
||||
is_active_in_lbrycrd
|
||||
)
|
||||
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsfornamebyid', name, '["' + claim_id + '"]'))
|
||||
if is_active_in_lbrycrd:
|
||||
if not expected:
|
||||
self.assertIsNone(claim)
|
||||
return
|
||||
expected['claims'][0]['lasttakeoverheight'] = expected['lasttakeoverheight']
|
||||
self.assertMatchDBClaim(expected['claims'][0], claim)
|
||||
self._check_supports(claim.claim_hash.hex(), expected['claims'][0].get('supports', []),
|
||||
claim_from_es[0][0]['support_amount'])
|
||||
else:
|
||||
if 'claims' in expected and expected['claims'] is not None:
|
||||
# ensure that if we do have the matching claim that it is not active
|
||||
self.assertEqual(expected['claims'][0]['effectiveamount'], 0)
|
||||
return claim
|
||||
|
||||
async def assertMatchClaimIsWinning(self, name, claim_id):
|
||||
self.assertEqual(claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex())
|
||||
await self.assertMatchClaimsForName(name)
|
||||
|
||||
def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount, is_active_in_lbrycrd=True):
|
||||
total_amount = 0
|
||||
def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount):
|
||||
total_lbrycrd_amount = 0.0
|
||||
total_es_amount = 0.0
|
||||
active_es_amount = 0.0
|
||||
db = self.conductor.spv_node.server.bp.db
|
||||
es_supports = db.get_supports(bytes.fromhex(claim_id))
|
||||
|
||||
for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))):
|
||||
total_amount += amount
|
||||
if is_active_in_lbrycrd:
|
||||
support = lbrycrd_supports[i]
|
||||
self.assertEqual(support['txId'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex())
|
||||
self.assertEqual(support['n'], position)
|
||||
self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num))
|
||||
self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True))
|
||||
self.assertEqual(total_amount, es_support_amount, f"lbrycrd support amount: {total_amount} vs es: {es_support_amount}")
|
||||
# we're only concerned about active supports here, and they should match
|
||||
self.assertTrue(len(es_supports) >= len(lbrycrd_supports))
|
||||
|
||||
for i, (tx_num, position, amount) in enumerate(es_supports):
|
||||
total_es_amount += amount
|
||||
valid_height = db.get_activation(tx_num, position, is_support=True)
|
||||
if valid_height > db.db_height:
|
||||
continue
|
||||
active_es_amount += amount
|
||||
txid = db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex()
|
||||
support = next(filter(lambda s: s['txid'] == txid and s['n'] == position, lbrycrd_supports))
|
||||
total_lbrycrd_amount += support['amount']
|
||||
self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num))
|
||||
self.assertEqual(support['validatheight'], valid_height)
|
||||
|
||||
self.assertEqual(total_es_amount, es_support_amount)
|
||||
self.assertEqual(active_es_amount, total_lbrycrd_amount)
|
||||
|
||||
async def assertMatchClaimsForName(self, name):
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name))
|
||||
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name, "", "true"))
|
||||
db = self.conductor.spv_node.server.bp.db
|
||||
# self.assertEqual(len(expected['claims']), len(db_claims.claims))
|
||||
# self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight)
|
||||
last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight']
|
||||
|
||||
for c in expected['claims']:
|
||||
c['lastTakeoverHeight'] = last_takeover
|
||||
claim_id = c['claimId']
|
||||
c['lasttakeoverheight'] = expected['lasttakeoverheight']
|
||||
claim_id = c['claimid']
|
||||
claim_hash = bytes.fromhex(claim_id)
|
||||
claim = db._fs_get_claim_by_hash(claim_hash)
|
||||
self.assertMatchDBClaim(c, claim)
|
||||
|
||||
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
|
||||
claim_id=c['claimId']
|
||||
claim_id=claim_id
|
||||
)
|
||||
self.assertEqual(len(claim_from_es[0]), 1)
|
||||
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), c['claimId'])
|
||||
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim_id)
|
||||
self.assertMatchESClaim(claim_from_es[0][0], claim)
|
||||
self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount'])
|
||||
self._check_supports(claim_id, c.get('supports', []),
|
||||
claim_from_es[0][0]['support_amount'])
|
||||
|
||||
|
||||
class ResolveCommand(BaseResolveTestCase):
|
||||
|
@ -262,18 +278,18 @@ class ResolveCommand(BaseResolveTestCase):
|
|||
self.assertEqual(claim['confirmations'], json.loads(tx_details)['confirmations'])
|
||||
|
||||
# resolve handles invalid data
|
||||
await self.blockchain_claim_name("gibberish", hexlify(b"{'invalid':'json'}").decode(), "0.1")
|
||||
await self.generate(1)
|
||||
response = await self.out(self.daemon.jsonrpc_resolve("lbry://gibberish"))
|
||||
self.assertSetEqual({'lbry://gibberish'}, set(response))
|
||||
claim = response['lbry://gibberish']
|
||||
self.assertEqual(claim['name'], 'gibberish')
|
||||
self.assertNotIn('value', claim)
|
||||
# await self.blockchain_claim_name("gibberish", hexlify(b"{'invalid':'json'}").decode(), "0.1")
|
||||
# await self.generate(1)
|
||||
# response = await self.out(self.daemon.jsonrpc_resolve("lbry://gibberish"))
|
||||
# self.assertSetEqual({'lbry://gibberish'}, set(response))
|
||||
# claim = response['lbry://gibberish']
|
||||
# self.assertEqual(claim['name'], 'gibberish')
|
||||
# self.assertNotIn('value', claim)
|
||||
|
||||
# resolve retries
|
||||
await self.conductor.spv_node.stop()
|
||||
resolve_task = asyncio.create_task(self.resolve('foo'))
|
||||
await self.conductor.spv_node.start(self.conductor.blockchain_node)
|
||||
await self.conductor.spv_node.start(self.conductor.lbcwallet_node)
|
||||
self.assertIsNotNone((await resolve_task)['claim_id'])
|
||||
|
||||
async def test_winning_by_effective_amount(self):
|
||||
|
@ -443,16 +459,16 @@ class ResolveCommand(BaseResolveTestCase):
|
|||
self.assertEqual(one, claim6['name'])
|
||||
|
||||
async def test_resolve_old_claim(self):
|
||||
channel = await self.daemon.jsonrpc_channel_create('@olds', '1.0')
|
||||
channel = await self.daemon.jsonrpc_channel_create('@olds', '1.0', blocking=True)
|
||||
await self.confirm_tx(channel.id)
|
||||
address = channel.outputs[0].get_address(self.account.ledger)
|
||||
claim = generate_signed_legacy(address, channel.outputs[0])
|
||||
tx = await Transaction.claim_create('example', claim.SerializeToString(), 1, address, [self.account], self.account)
|
||||
await tx.sign([self.account])
|
||||
await self.broadcast(tx)
|
||||
await self.confirm_tx(tx.id)
|
||||
await self.broadcast_and_confirm(tx)
|
||||
|
||||
response = await self.resolve('@olds/example')
|
||||
self.assertTrue('is_channel_signature_valid' in response, str(response))
|
||||
self.assertTrue(response['is_channel_signature_valid'])
|
||||
|
||||
claim.publisherSignature.signature = bytes(reversed(claim.publisherSignature.signature))
|
||||
|
@ -460,8 +476,7 @@ class ResolveCommand(BaseResolveTestCase):
|
|||
'bad_example', claim.SerializeToString(), 1, address, [self.account], self.account
|
||||
)
|
||||
await tx.sign([self.account])
|
||||
await self.broadcast(tx)
|
||||
await self.confirm_tx(tx.id)
|
||||
await self.broadcast_and_confirm(tx)
|
||||
|
||||
response = await self.resolve('bad_example')
|
||||
self.assertFalse(response['is_channel_signature_valid'])
|
||||
|
@ -646,7 +661,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
|
|||
self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height)
|
||||
await self.assertMatchClaimIsWinning(name, winning_claim_id)
|
||||
for non_winning in non_winning_claims:
|
||||
claim = await self.assertMatchClaim(
|
||||
claim = await self.assertMatchClaim(name,
|
||||
non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd
|
||||
)
|
||||
self.assertEqual(non_winning.activation_height, claim.activation_height)
|
||||
|
@ -992,7 +1007,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
|
|||
name = 'test'
|
||||
await self.generate(494)
|
||||
address = (await self.account.receiving.get_addresses(True))[0]
|
||||
await self.blockchain.send_to_address(address, 400.0)
|
||||
await self.send_to_address_and_wait(address, 400.0)
|
||||
await self.account.ledger.on_address.first
|
||||
await self.generate(100)
|
||||
self.assertEqual(800, self.conductor.spv_node.server.bp.db.db_height)
|
||||
|
@ -1334,7 +1349,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
|
|||
|
||||
await self.generate(1)
|
||||
|
||||
await self.assertMatchClaim(first_claim_id)
|
||||
await self.assertMatchClaim(name, first_claim_id)
|
||||
await self.assertMatchClaimIsWinning(name, second_claim_id)
|
||||
|
||||
async def test_remove_controlling_support(self):
|
||||
|
@ -1405,12 +1420,12 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
|
|||
await self.generate(32)
|
||||
|
||||
second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id']
|
||||
await self.assertNoClaim(second_claim_id)
|
||||
await self.assertNoClaim(name, second_claim_id)
|
||||
self.assertEqual(
|
||||
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1
|
||||
)
|
||||
await self.generate(1)
|
||||
await self.assertMatchClaim(second_claim_id)
|
||||
await self.assertMatchClaim(name, second_claim_id)
|
||||
self.assertEqual(
|
||||
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2
|
||||
)
|
||||
|
@ -1570,7 +1585,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
|||
|
||||
# reorg the last block dropping our claim tx
|
||||
await self.blockchain.invalidate_block(invalidated_block_hash)
|
||||
await self.blockchain.clear_mempool()
|
||||
await self.conductor.clear_mempool()
|
||||
await self.blockchain.generate(2)
|
||||
|
||||
# wait for the client to catch up and verify the reorg
|
||||
|
@ -1649,7 +1664,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
|||
|
||||
# reorg the last block dropping our claim tx
|
||||
await self.blockchain.invalidate_block(invalidated_block_hash)
|
||||
await self.blockchain.clear_mempool()
|
||||
await self.conductor.clear_mempool()
|
||||
await self.blockchain.generate(2)
|
||||
|
||||
# wait for the client to catch up and verify the reorg
|
||||
|
|
|
@ -21,9 +21,8 @@ class BasicTransactionTest(IntegrationTestCase):
|
|||
[asyncio.ensure_future(self.on_address_update(address1)),
|
||||
asyncio.ensure_future(self.on_address_update(address2))]
|
||||
))
|
||||
sendtxid1 = await self.blockchain.send_to_address(address1, 5)
|
||||
sendtxid2 = await self.blockchain.send_to_address(address2, 5)
|
||||
await self.blockchain.generate(1)
|
||||
await self.send_to_address_and_wait(address1, 5)
|
||||
await self.send_to_address_and_wait(address2, 5, 1)
|
||||
await notifications
|
||||
|
||||
self.assertEqual(d2l(await self.account.get_balance()), '10.0')
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import unittest
|
||||
|
||||
from lbry.testcase import CommandTestCase
|
||||
|
||||
|
||||
|
@ -6,7 +8,7 @@ class TransactionCommandsTestCase(CommandTestCase):
|
|||
async def test_transaction_show(self):
|
||||
# local tx
|
||||
result = await self.out(self.daemon.jsonrpc_account_send(
|
||||
'5.0', await self.daemon.jsonrpc_address_unused(self.account.id)
|
||||
'5.0', await self.daemon.jsonrpc_address_unused(self.account.id), blocking=True
|
||||
))
|
||||
await self.confirm_tx(result['txid'])
|
||||
tx = await self.daemon.jsonrpc_transaction_show(result['txid'])
|
||||
|
@ -27,10 +29,9 @@ class TransactionCommandsTestCase(CommandTestCase):
|
|||
self.assertFalse(result['success'])
|
||||
|
||||
async def test_utxo_release(self):
|
||||
sendtxid = await self.blockchain.send_to_address(
|
||||
await self.account.receiving.get_or_create_usable_address(), 1
|
||||
await self.send_to_address_and_wait(
|
||||
await self.account.receiving.get_or_create_usable_address(), 1, 1
|
||||
)
|
||||
await self.confirm_tx(sendtxid)
|
||||
await self.assertBalance(self.account, '11.0')
|
||||
await self.ledger.reserve_outputs(await self.account.get_utxos())
|
||||
await self.assertBalance(self.account, '0.0')
|
||||
|
@ -40,6 +41,7 @@ class TransactionCommandsTestCase(CommandTestCase):
|
|||
|
||||
class TestSegwit(CommandTestCase):
|
||||
|
||||
@unittest.SkipTest
|
||||
async def test_segwit(self):
|
||||
p2sh_address1 = await self.blockchain.get_new_address(self.blockchain.P2SH_SEGWIT_ADDRESS)
|
||||
p2sh_address2 = await self.blockchain.get_new_address(self.blockchain.P2SH_SEGWIT_ADDRESS)
|
||||
|
@ -53,14 +55,13 @@ class TestSegwit(CommandTestCase):
|
|||
p2sh_txid2 = await self.blockchain.send_to_address(p2sh_address2, '1.0')
|
||||
bech32_txid1 = await self.blockchain.send_to_address(bech32_address1, '1.0')
|
||||
bech32_txid2 = await self.blockchain.send_to_address(bech32_address2, '1.0')
|
||||
|
||||
await self.generate(1)
|
||||
|
||||
# P2SH & BECH32 can pay to P2SH address
|
||||
tx = await self.blockchain.create_raw_transaction([
|
||||
{"txid": p2sh_txid1, "vout": 0},
|
||||
{"txid": bech32_txid1, "vout": 0},
|
||||
], [{p2sh_address3: '1.9'}]
|
||||
], {p2sh_address3: 1.9}
|
||||
)
|
||||
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
|
||||
p2sh_txid3 = await self.blockchain.send_raw_transaction(tx)
|
||||
|
@ -71,7 +72,7 @@ class TestSegwit(CommandTestCase):
|
|||
tx = await self.blockchain.create_raw_transaction([
|
||||
{"txid": p2sh_txid2, "vout": 0},
|
||||
{"txid": bech32_txid2, "vout": 0},
|
||||
], [{bech32_address3: '1.9'}]
|
||||
], {bech32_address3: 1.9}
|
||||
)
|
||||
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
|
||||
bech32_txid3 = await self.blockchain.send_raw_transaction(tx)
|
||||
|
@ -83,12 +84,9 @@ class TestSegwit(CommandTestCase):
|
|||
tx = await self.blockchain.create_raw_transaction([
|
||||
{"txid": p2sh_txid3, "vout": 0},
|
||||
{"txid": bech32_txid3, "vout": 0},
|
||||
], [{address: '3.5'}]
|
||||
], {address: 3.5}
|
||||
)
|
||||
tx = await self.blockchain.sign_raw_transaction_with_wallet(tx)
|
||||
txid = await self.blockchain.send_raw_transaction(tx)
|
||||
await self.on_transaction_id(txid)
|
||||
await self.generate(1)
|
||||
await self.on_transaction_id(txid)
|
||||
|
||||
await self.generate_and_wait(1, [txid])
|
||||
await self.assertBalance(self.account, '13.5')
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import asyncio
|
||||
import random
|
||||
from itertools import chain
|
||||
|
||||
import lbry.wallet.rpc.jsonrpc
|
||||
from lbry.wallet.transaction import Transaction, Output, Input
|
||||
from lbry.testcase import IntegrationTestCase
|
||||
from lbry.wallet.util import satoshis_to_coins, coins_to_satoshis
|
||||
|
@ -19,10 +19,10 @@ class BasicTransactionTests(IntegrationTestCase):
|
|||
# to the 10th receiving address for a total of 30 UTXOs on the entire account
|
||||
for i in range(10):
|
||||
notification = asyncio.ensure_future(self.on_address_update(addresses[i]))
|
||||
txid = await self.blockchain.send_to_address(addresses[i], 10)
|
||||
_ = await self.send_to_address_and_wait(addresses[i], 10)
|
||||
await notification
|
||||
notification = asyncio.ensure_future(self.on_address_update(addresses[9]))
|
||||
txid = await self.blockchain.send_to_address(addresses[9], 10)
|
||||
_ = await self.send_to_address_and_wait(addresses[9], 10)
|
||||
await notification
|
||||
|
||||
# use batching to reduce issues with send_to_address on cli
|
||||
|
@ -88,12 +88,10 @@ class BasicTransactionTests(IntegrationTestCase):
|
|||
await self.assertBalance(account2, '0.0')
|
||||
|
||||
addresses = await account1.receiving.get_addresses()
|
||||
txids = await asyncio.gather(*(
|
||||
self.blockchain.send_to_address(address, 1.1) for address in addresses[:5]
|
||||
))
|
||||
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # mempool
|
||||
await self.blockchain.generate(1)
|
||||
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # confirmed
|
||||
txids = []
|
||||
for address in addresses[:5]:
|
||||
txids.append(await self.send_to_address_and_wait(address, 1.1))
|
||||
await self.generate_and_wait(1, txids)
|
||||
await self.assertBalance(account1, '5.5')
|
||||
await self.assertBalance(account2, '0.0')
|
||||
|
||||
|
@ -148,11 +146,8 @@ class BasicTransactionTests(IntegrationTestCase):
|
|||
return summary
|
||||
self.conductor.spv_node.server.bp.mempool.transaction_summaries = random_summary
|
||||
# 10 unconfirmed txs, all from blockchain wallet
|
||||
sends = [self.blockchain.send_to_address(address, 10) for _ in range(10)]
|
||||
# use batching to reduce issues with send_to_address on cli
|
||||
for batch in range(0, len(sends), 10):
|
||||
txids = await asyncio.gather(*sends[batch:batch + 10])
|
||||
await asyncio.wait([self.on_transaction_id(txid) for txid in txids])
|
||||
for i in range(10):
|
||||
await self.send_to_address_and_wait(address, 10)
|
||||
remote_status = await self.ledger.network.subscribe_address(address)
|
||||
self.assertTrue(await self.ledger.update_history(address, remote_status))
|
||||
# 20 unconfirmed txs, 10 from blockchain, 10 from local to local
|
||||
|
@ -170,8 +165,7 @@ class BasicTransactionTests(IntegrationTestCase):
|
|||
remote_status = await self.ledger.network.subscribe_address(address)
|
||||
self.assertTrue(await self.ledger.update_history(address, remote_status))
|
||||
# server history grows unordered
|
||||
txid = await self.blockchain.send_to_address(address, 1)
|
||||
await self.on_transaction_id(txid)
|
||||
await self.send_to_address_and_wait(address, 1)
|
||||
self.assertTrue(await self.ledger.update_history(address, remote_status))
|
||||
self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1]))
|
||||
self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync))
|
||||
|
@ -195,7 +189,7 @@ class BasicTransactionTests(IntegrationTestCase):
|
|||
self.ledger, 2000000000000, [self.account], set_reserved=False, return_insufficient_funds=True
|
||||
)
|
||||
got_amounts = [estimator.effective_amount for estimator in spendable]
|
||||
self.assertListEqual(amounts, got_amounts)
|
||||
self.assertListEqual(sorted(amounts), sorted(got_amounts))
|
||||
|
||||
async def test_sqlite_coin_chooser(self):
|
||||
wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger})
|
||||
|
@ -206,26 +200,26 @@ class BasicTransactionTests(IntegrationTestCase):
|
|||
other_account = self.wallet.generate_account(self.ledger)
|
||||
other_address = await other_account.receiving.get_or_create_usable_address()
|
||||
self.ledger.coin_selection_strategy = 'sqlite'
|
||||
await self.ledger.subscribe_account(self.account)
|
||||
await self.ledger.subscribe_account(other_account)
|
||||
|
||||
accepted = asyncio.ensure_future(self.on_address_update(address))
|
||||
txid = await self.blockchain.send_to_address(address, 1.0)
|
||||
_ = await self.send_to_address_and_wait(address, 1.0)
|
||||
await accepted
|
||||
|
||||
accepted = asyncio.ensure_future(self.on_address_update(address))
|
||||
txid = await self.blockchain.send_to_address(address, 1.0)
|
||||
_ = await self.send_to_address_and_wait(address, 1.0)
|
||||
await accepted
|
||||
|
||||
accepted = asyncio.ensure_future(self.on_address_update(address))
|
||||
txid = await self.blockchain.send_to_address(address, 3.0)
|
||||
_ = await self.send_to_address_and_wait(address, 3.0)
|
||||
await accepted
|
||||
|
||||
accepted = asyncio.ensure_future(self.on_address_update(address))
|
||||
txid = await self.blockchain.send_to_address(address, 5.0)
|
||||
_ = await self.send_to_address_and_wait(address, 5.0)
|
||||
await accepted
|
||||
|
||||
accepted = asyncio.ensure_future(self.on_address_update(address))
|
||||
txid = await self.blockchain.send_to_address(address, 10.0)
|
||||
_ = await self.send_to_address_and_wait(address, 10.0)
|
||||
await accepted
|
||||
|
||||
await self.assertBalance(self.account, '20.0')
|
||||
|
@ -266,6 +260,8 @@ class BasicTransactionTests(IntegrationTestCase):
|
|||
async def broadcast(tx):
|
||||
try:
|
||||
return await real_broadcast(tx)
|
||||
except lbry.wallet.rpc.jsonrpc.RPCError:
|
||||
pass
|
||||
finally:
|
||||
e.set()
|
||||
|
||||
|
|
Loading…
Reference in a new issue