This commit is contained in:
Jack Robison 2022-01-12 11:59:44 -05:00
parent e3a4dab6cb
commit 4f4ecd64cc
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
13 changed files with 180 additions and 146 deletions

View file

@ -285,6 +285,17 @@ class IntegrationTestCase(AsyncioTestCase):
lambda e: e.tx.id == tx.id and e.address == address
)
async def generate(self, blocks):
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
print("generate", blocks)
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
self.conductor.spv_node.server.synchronized.clear()
await self.blockchain.generate(blocks)
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
print("wait for synchronized")
await self.conductor.spv_node.server.synchronized.wait()
print("finished waiting for synchronized")
class FakeExchangeRateManager(ExchangeRateManager):
@ -456,8 +467,7 @@ class CommandTestCase(IntegrationTestCase):
async def confirm_tx(self, txid, ledger=None):
""" Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """
await self.on_transaction_id(txid, ledger)
await self.generate(1)
await self.on_transaction_id(txid, ledger)
await asyncio.wait([self.generate(1), self.on_transaction_id(txid, ledger)], timeout=5)
return txid
async def on_transaction_dict(self, tx):
@ -472,12 +482,6 @@ class CommandTestCase(IntegrationTestCase):
addresses.add(txo['address'])
return list(addresses)
async def generate(self, blocks):
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
await self.blockchain.generate(blocks)
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True):
txid = await self.blockchain._cli_cmnd('claimname', name, value, amount)
if confirm:

View file

@ -1,5 +1,5 @@
__hub_url__ = (
"https://github.com/lbryio/hub/releases/download/v0.2021.12.18.1/hub"
)
from .node import Conductor
from .service import ConductorService
from lbry.wallet.orchstr8.node import Conductor
from lbry.wallet.orchstr8.service import ConductorService

View file

@ -16,11 +16,13 @@ import urllib.request
from uuid import uuid4
import lbry
from lbry.wallet.server.server import Server
from lbry.wallet.server.env import Env
from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent
from lbry.conf import KnownHubsList, Config
from lbry.wallet.orchstr8 import __hub_url__
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.chain_reader import BlockchainReaderServer
from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter
log = logging.getLogger(__name__)
@ -189,49 +191,48 @@ class SPVNode:
self.coin_class = coin_class
self.controller = None
self.data_path = None
self.server = None
self.server: Optional[BlockchainReaderServer] = None
self.writer: Optional[BlockProcessor] = None
self.es_writer: Optional[ElasticWriter] = None
self.hostname = 'localhost'
self.port = 50001 + node_number # avoid conflict with default daemon
self.udp_port = self.port
self.session_timeout = 600
self.rpc_port = '0' # disabled by default
self.stopped = False
self.index_name = uuid4().hex
async def start(self, blockchain_node: 'BlockchainNode', extraconf=None):
self.data_path = tempfile.mkdtemp()
conf = {
'DESCRIPTION': '',
'PAYMENT_ADDRESS': '',
'DAILY_FEE': '0',
'DB_DIRECTORY': self.data_path,
'DAEMON_URL': blockchain_node.rpc_url,
'REORG_LIMIT': '100',
'HOST': self.hostname,
'TCP_PORT': str(self.port),
'UDP_PORT': str(self.udp_port),
'SESSION_TIMEOUT': str(self.session_timeout),
'MAX_QUERY_WORKERS': '0',
'INDIVIDUAL_TAG_INDEXES': '',
'RPC_PORT': self.rpc_port,
'ES_INDEX_PREFIX': self.index_name,
'ES_MODE': 'writer',
'description': '',
'payment_address': '',
'daily_fee': '0',
'db_dir': self.data_path,
'daemon_url': blockchain_node.rpc_url,
'reorg_limit': 100,
'host': self.hostname,
'tcp_port': self.port,
'udp_port': self.udp_port,
'session_timeout': self.session_timeout,
'max_query_workers': 0,
'es_index_prefix': self.index_name,
}
if extraconf:
conf.update(extraconf)
# TODO: don't use os.environ
os.environ.update(conf)
self.server = Server(Env(self.coin_class))
self.server.bp.mempool.refresh_secs = self.server.bp.prefetcher.polling_delay = 0.5
await self.server.start()
self.writer = BlockProcessor(Env(self.coin_class, es_mode='writer', **conf))
self.server = BlockchainReaderServer(Env(self.coin_class, es_mode='reader', **conf))
self.es_writer = ElasticWriter(Env(self.coin_class, es_mode='reader', **conf))
await self.writer.open()
await self.writer.start()
await asyncio.wait([self.server.start(), self.es_writer.start()])
async def stop(self, cleanup=True):
if self.stopped:
return
try:
await self.server.db.search_index.delete_index()
await self.server.db.search_index.stop()
await self.es_writer.stop(delete_index=True)
await self.server.stop()
await self.writer.stop()
self.stopped = True
finally:
cleanup and self.cleanup()

View file

@ -1,12 +1,12 @@
#!/bin/bash
SNAPSHOT_HEIGHT="1049658"
SNAPSHOT_HEIGHT="1072108"
HUB_VOLUME_PATH="/var/lib/docker/volumes/${USER}_wallet_server"
ES_VOLUME_PATH="/var/lib/docker/volumes/${USER}_es01"
SNAPSHOT_TAR_NAME="wallet_server_snapshot_${SNAPSHOT_HEIGHT}.tar"
ES_SNAPSHOT_TAR_NAME="es_snapshot_${SNAPSHOT_HEIGHT}.tar"
SNAPSHOT_TAR_NAME="wallet_server_snapshot_${SNAPSHOT_HEIGHT}.tar.gz"
ES_SNAPSHOT_TAR_NAME="es_snapshot_${SNAPSHOT_HEIGHT}.tar.gz"
SNAPSHOT_URL="https://snapshots.lbry.com/hub/${SNAPSHOT_TAR_NAME}"
ES_SNAPSHOT_URL="https://snapshots.lbry.com/hub/${ES_SNAPSHOT_TAR_NAME}"

View file

@ -36,7 +36,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(self.ledger.headers.height, height)
await self.assertBlockHash(height)
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(2)
await self.generate(2)
await self.ledger.on_header.where(lambda e: e.height == 207)
self.assertEqual(self.ledger.headers.height, 207)
await self.assertBlockHash(206)
@ -45,14 +45,14 @@ class BlockchainReorganizationTests(CommandTestCase):
# invalidate current block, move forward 3
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(3)
await self.generate(3)
await self.ledger.on_header.where(lambda e: e.height == 208)
self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(206)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
self.assertEqual(2, bp.reorg_count_metric._samples()[0][2])
await self.blockchain.generate(3)
await self.generate(3)
await self.ledger.on_header.where(lambda e: e.height == 211)
await self.assertBlockHash(209)
await self.assertBlockHash(210)
@ -61,7 +61,7 @@ class BlockchainReorganizationTests(CommandTestCase):
'still-valid', '1.0', file_path=self.create_upload_file(data=b'hi!')
)
await self.ledger.wait(still_valid)
await self.blockchain.generate(1)
await self.generate(1)
await self.ledger.on_header.where(lambda e: e.height == 212)
claim_id = still_valid.outputs[0].claim_id
c1 = (await self.resolve(f'still-valid#{claim_id}'))['claim_id']
@ -70,7 +70,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertTrue(c1 == c2 == c3)
abandon_tx = await self.daemon.jsonrpc_stream_abandon(claim_id=claim_id)
await self.blockchain.generate(1)
await self.generate(1)
await self.ledger.on_header.where(lambda e: e.height == 213)
c1 = await self.resolve(f'still-valid#{still_valid.outputs[0].claim_id}')
c2 = await self.daemon.jsonrpc_resolve([f'still-valid#{claim_id[:2]}'])
@ -113,7 +113,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool()
await self.blockchain.generate(2)
await self.generate(2)
# wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0)
@ -142,7 +142,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# broadcast the claim in a different block
new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode())
self.assertEqual(broadcast_tx.id, new_txid)
await self.blockchain.generate(1)
await self.generate(1)
# wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0)
@ -192,7 +192,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool()
await self.blockchain.generate(2)
await self.generate(2)
# wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0)
@ -221,7 +221,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# broadcast the claim in a different block
new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode())
self.assertEqual(broadcast_tx.id, new_txid)
await self.blockchain.generate(1)
await self.generate(1)
# wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0)

View file

@ -16,7 +16,7 @@ class NetworkTests(IntegrationTestCase):
async def test_remote_height_updated_automagically(self):
initial_height = self.ledger.network.remote_height
await self.blockchain.generate(1)
await self.generate(1)
await self.ledger.network.on_header.first
self.assertEqual(self.ledger.network.remote_height, initial_height + 1)
@ -85,8 +85,8 @@ class ReconnectTests(IntegrationTestCase):
async def test_direct_sync(self):
await self.ledger.stop()
initial_height = self.ledger.local_height_including_downloaded_height
await self.blockchain.generate(100)
while self.conductor.spv_node.server.session_mgr.notified_height < initial_height + 99: # off by 1
await self.generate(100)
while self.conductor.spv_node.server.session_manager.notified_height < initial_height + 99: # off by 1
await asyncio.sleep(0.1)
self.assertEqual(initial_height, self.ledger.local_height_including_downloaded_height)
await self.ledger.headers.open()
@ -105,7 +105,7 @@ class ReconnectTests(IntegrationTestCase):
# await self.ledger.resolve([], 'derp')
# self.assertTrue(self.ledger.network.is_connected)
await asyncio.wait_for(self.on_transaction_id(sendtxid), 10.0) # mempool
await self.blockchain.generate(1)
await self.generate(1)
await self.on_transaction_id(sendtxid) # confirmed
self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine
@ -123,7 +123,7 @@ class ReconnectTests(IntegrationTestCase):
await self.ledger.network.get_transaction(sendtxid)
# * goes to pick some water outside... * time passes by and another donation comes in
sendtxid = await self.blockchain.send_to_address(address1, 42)
await self.blockchain.generate(1)
await self.generate(1)
# (this is just so the test doesn't hang forever if it doesn't reconnect)
if not self.ledger.network.is_connected:
await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=10.0)
@ -169,7 +169,7 @@ class UDPServerFailDiscoveryTest(AsyncioTestCase):
self.addCleanup(conductor.stop_blockchain)
await conductor.start_spv()
self.addCleanup(conductor.stop_spv)
self.assertFalse(conductor.spv_node.server.bp.status_server.is_running)
self.assertFalse(conductor.spv_node.server.reader.status_server.is_running)
await asyncio.wait_for(conductor.start_wallet(), timeout=5)
self.addCleanup(conductor.stop_wallet)
self.assertTrue(conductor.wallet_node.ledger.network.is_connected)

View file

@ -63,7 +63,7 @@ class SyncTests(IntegrationTestCase):
await self.assertBalance(account1, '1.0')
await self.assertBalance(account2, '1.0')
await self.blockchain.generate(1)
await self.generate(1)
# pay 0.01 from main node to receiving node, would have increased change addresses
address0 = (await account0.receiving.get_addresses())[0]
@ -79,7 +79,7 @@ class SyncTests(IntegrationTestCase):
account1.ledger.wait(tx),
account2.ledger.wait(tx),
])
await self.blockchain.generate(1)
await self.generate(1)
await asyncio.wait([
account0.ledger.wait(tx),
account1.ledger.wait(tx),
@ -92,7 +92,7 @@ class SyncTests(IntegrationTestCase):
await self.assertBalance(account1, '0.989876')
await self.assertBalance(account2, '0.989876')
await self.blockchain.generate(1)
await self.generate(1)
# create a new mirror node and see if it syncs to same balance from scratch
node3 = await self.make_wallet_node(account1.seed)

View file

@ -11,7 +11,7 @@ from lbry.wallet.dewies import dict_values_to_lbc
class WalletCommands(CommandTestCase):
async def test_wallet_create_and_add_subscribe(self):
session = next(iter(self.conductor.spv_node.server.session_mgr.sessions.values()))
session = next(iter(self.conductor.spv_node.server.session_manager.sessions.values()))
self.assertEqual(len(session.hashX_subs), 27)
wallet = await self.daemon.jsonrpc_wallet_create('foo', create_account=True, single_key=True)
self.assertEqual(len(session.hashX_subs), 28)

View file

@ -25,13 +25,13 @@ class TestSessions(IntegrationTestCase):
)
await session.create_connection()
await session.send_request('server.banner', ())
self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1)
self.assertEqual(len(self.conductor.spv_node.server.session_manager.sessions), 1)
self.assertFalse(session.is_closing())
await asyncio.sleep(1.1)
with self.assertRaises(asyncio.TimeoutError):
await session.send_request('server.banner', ())
self.assertTrue(session.is_closing())
self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0)
self.assertEqual(len(self.conductor.spv_node.server.session_manager.sessions), 0)
async def test_proper_version(self):
info = await self.ledger.network.get_server_features()
@ -186,7 +186,7 @@ class TestHubDiscovery(CommandTestCase):
self.daemon.ledger.network.client.server_address_and_port, ('127.0.0.1', kp_final_node.port)
)
kp_final_node.server.session_mgr._notify_peer('127.0.0.1:9988')
kp_final_node.server.session_manager._notify_peer('127.0.0.1:9988')
await self.daemon.ledger.network.on_hub.first
await asyncio.sleep(0.5) # wait for above event to be processed by other listeners
self.assertEqual(

View file

@ -23,7 +23,7 @@ class BaseResolveTestCase(CommandTestCase):
def assertMatchESClaim(self, claim_from_es, claim_from_db):
self.assertEqual(claim_from_es['claim_hash'][::-1].hex(), claim_from_db.claim_hash.hex())
self.assertEqual(claim_from_es['claim_id'], claim_from_db.claim_hash.hex())
self.assertEqual(claim_from_es['activation_height'], claim_from_db.activation_height)
self.assertEqual(claim_from_es['activation_height'], claim_from_db.activation_height, f"es height: {claim_from_es['activation_height']}, rocksdb height: {claim_from_db.activation_height}")
self.assertEqual(claim_from_es['last_take_over_height'], claim_from_db.last_takeover_height)
self.assertEqual(claim_from_es['tx_id'], claim_from_db.tx_hash[::-1].hex())
self.assertEqual(claim_from_es['tx_nout'], claim_from_db.position)
@ -44,44 +44,44 @@ class BaseResolveTestCase(CommandTestCase):
if claim_id is None:
self.assertIn('error', other)
self.assertEqual(other['error']['name'], 'NOT_FOUND')
claims_from_es = (await self.conductor.spv_node.server.bp.db.search_index.search(name=name))[0]
claims_from_es = (await self.conductor.spv_node.server.session_manager.search_index.search(name=name))[0]
claims_from_es = [c['claim_hash'][::-1].hex() for c in claims_from_es]
self.assertNotIn(claim_id, claims_from_es)
else:
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id)
claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(claim_id=claim_id)
self.assertEqual(claim_id, other['claim_id'])
self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex())
async def assertNoClaimForName(self, name: str):
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
stream, channel, _, _ = await self.conductor.spv_node.server.db.resolve(name)
self.assertNotIn('claimId', lbrycrd_winning)
if stream is not None:
self.assertIsInstance(stream, LookupError)
else:
self.assertIsInstance(channel, LookupError)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name)
claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(name=name)
self.assertListEqual([], claim_from_es[0])
async def assertNoClaim(self, claim_id: str):
self.assertDictEqual(
{}, json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id)
claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(claim_id=claim_id)
self.assertListEqual([], claim_from_es[0])
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
claim = await self.conductor.spv_node.server.db.fs_getclaimbyid(claim_id)
self.assertIsNone(claim)
async def assertMatchWinningClaim(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
stream, channel, _, _ = await self.conductor.spv_node.server.db.resolve(name)
claim = stream if stream else channel
await self._assertMatchClaim(expected, claim)
return claim
async def _assertMatchClaim(self, expected, claim):
self.assertMatchDBClaim(expected, claim)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(
claim_id=claim.claim_hash.hex()
)
self.assertEqual(len(claim_from_es[0]), 1)
@ -90,7 +90,7 @@ class BaseResolveTestCase(CommandTestCase):
async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id)
claim = await self.conductor.spv_node.server.db.fs_getclaimbyid(claim_id)
if is_active_in_lbrycrd:
if not expected:
self.assertIsNone(claim)
@ -98,7 +98,7 @@ class BaseResolveTestCase(CommandTestCase):
self.assertMatchDBClaim(expected, claim)
else:
self.assertDictEqual({}, expected)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(
claim_id=claim.claim_hash.hex()
)
self.assertEqual(len(claim_from_es[0]), 1)
@ -116,7 +116,7 @@ class BaseResolveTestCase(CommandTestCase):
def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount, is_active_in_lbrycrd=True):
total_amount = 0
db = self.conductor.spv_node.server.bp.db
db = self.conductor.spv_node.server.db
for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))):
total_amount += amount
@ -131,7 +131,7 @@ class BaseResolveTestCase(CommandTestCase):
async def assertMatchClaimsForName(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name))
db = self.conductor.spv_node.server.bp.db
db = self.conductor.spv_node.server.db
# self.assertEqual(len(expected['claims']), len(db_claims.claims))
# self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight)
last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight']
@ -143,7 +143,7 @@ class BaseResolveTestCase(CommandTestCase):
claim = db._fs_get_claim_by_hash(claim_hash)
self.assertMatchDBClaim(c, claim)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(
claim_id=c['claimId']
)
self.assertEqual(len(claim_from_es[0]), 1)
@ -151,6 +151,17 @@ class BaseResolveTestCase(CommandTestCase):
self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount'])
async def assertNameState(self, height: int, name: str, winning_claim_id: str, last_takeover_height: int,
non_winning_claims: List[ClaimStateValue]):
self.assertEqual(height, self.conductor.spv_node.server.db.db_height)
await self.assertMatchClaimIsWinning(name, winning_claim_id)
for non_winning in non_winning_claims:
claim = await self.assertMatchClaim(
non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd
)
self.assertEqual(non_winning.activation_height, claim.activation_height)
self.assertEqual(last_takeover_height, claim.last_takeover_height)
class ResolveCommand(BaseResolveTestCase):
async def test_colliding_short_id(self):
@ -641,17 +652,6 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
async def create_stream_claim(self, amount: str, name='derp') -> str:
return (await self.stream_create(name, amount, allow_duplicate_name=True))['outputs'][0]['claim_id']
async def assertNameState(self, height: int, name: str, winning_claim_id: str, last_takeover_height: int,
non_winning_claims: List[ClaimStateValue]):
self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height)
await self.assertMatchClaimIsWinning(name, winning_claim_id)
for non_winning in non_winning_claims:
claim = await self.assertMatchClaim(
non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd
)
self.assertEqual(non_winning.activation_height, claim.activation_height)
self.assertEqual(last_takeover_height, claim.last_takeover_height)
async def test_delay_takeover_with_update(self):
name = 'derp'
first_claim_id = await self.create_stream_claim('0.2', name)
@ -961,7 +961,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
)
greater_than_or_equal_to_zero = [
claim['claim_id'] for claim in (
await self.conductor.spv_node.server.bp.db.search_index.search(
await self.conductor.spv_node.server.session_manager.search_index.search(
channel_id=channel_id, fee_amount=">=0"
))[0]
]
@ -969,7 +969,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
self.assertSetEqual(set(greater_than_or_equal_to_zero), {stream_with_no_fee, stream_with_fee})
greater_than_zero = [
claim['claim_id'] for claim in (
await self.conductor.spv_node.server.bp.db.search_index.search(
await self.conductor.spv_node.server.session_manager.search_index.search(
channel_id=channel_id, fee_amount=">0"
))[0]
]
@ -977,7 +977,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
self.assertSetEqual(set(greater_than_zero), {stream_with_fee})
equal_to_zero = [
claim['claim_id'] for claim in (
await self.conductor.spv_node.server.bp.db.search_index.search(
await self.conductor.spv_node.server.session_manager.search_index.search(
channel_id=channel_id, fee_amount="<=0"
))[0]
]
@ -995,7 +995,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.blockchain.send_to_address(address, 400.0)
await self.account.ledger.on_address.first
await self.generate(100)
self.assertEqual(800, self.conductor.spv_node.server.bp.db.db_height)
self.assertEqual(800, self.conductor.spv_node.server.db.db_height)
# Block 801: Claim A for 10 LBC is accepted.
# It is the first claim, so it immediately becomes active and controlling.
@ -1007,10 +1007,10 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# Its activation height is 1121 + min(4032, floor((1121-801) / 32)) = 1121 + 10 = 1131.
# State: A(10) is controlling, B(20) is accepted.
await self.generate(32 * 10 - 1)
self.assertEqual(1120, self.conductor.spv_node.server.bp.db.db_height)
self.assertEqual(1120, self.conductor.spv_node.server.db.db_height)
claim_id_B = (await self.stream_create(name, '20.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
claim_B, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_B}")
self.assertEqual(1121, self.conductor.spv_node.server.bp.db.db_height)
claim_B, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_B}")
self.assertEqual(1121, self.conductor.spv_node.server.db.db_height)
self.assertEqual(1131, claim_B.activation_height)
await self.assertMatchClaimIsWinning(name, claim_id_A)
@ -1018,33 +1018,33 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# Since it is a support for the controlling claim, it activates immediately.
# State: A(10+14) is controlling, B(20) is accepted.
await self.support_create(claim_id_A, bid='14.0')
self.assertEqual(1122, self.conductor.spv_node.server.bp.db.db_height)
self.assertEqual(1122, self.conductor.spv_node.server.db.db_height)
await self.assertMatchClaimIsWinning(name, claim_id_A)
# Block 1123: Claim C for 50 LBC is accepted.
# The activation height is 1123 + min(4032, floor((1123-801) / 32)) = 1123 + 10 = 1133.
# State: A(10+14) is controlling, B(20) is accepted, C(50) is accepted.
claim_id_C = (await self.stream_create(name, '50.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
self.assertEqual(1123, self.conductor.spv_node.server.bp.db.db_height)
claim_C, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_C}")
self.assertEqual(1123, self.conductor.spv_node.server.db.db_height)
claim_C, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_C}")
self.assertEqual(1133, claim_C.activation_height)
await self.assertMatchClaimIsWinning(name, claim_id_A)
await self.generate(7)
self.assertEqual(1130, self.conductor.spv_node.server.bp.db.db_height)
self.assertEqual(1130, self.conductor.spv_node.server.db.db_height)
await self.assertMatchClaimIsWinning(name, claim_id_A)
await self.generate(1)
# Block 1131: Claim B activates. It has 20 LBC, while claim A has 24 LBC (10 original + 14 from support X). There is no takeover, and claim A remains controlling.
# State: A(10+14) is controlling, B(20) is active, C(50) is accepted.
self.assertEqual(1131, self.conductor.spv_node.server.bp.db.db_height)
self.assertEqual(1131, self.conductor.spv_node.server.db.db_height)
await self.assertMatchClaimIsWinning(name, claim_id_A)
# Block 1132: Claim D for 300 LBC is accepted. The activation height is 1132 + min(4032, floor((1132-801) / 32)) = 1132 + 10 = 1142.
# State: A(10+14) is controlling, B(20) is active, C(50) is accepted, D(300) is accepted.
claim_id_D = (await self.stream_create(name, '300.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
self.assertEqual(1132, self.conductor.spv_node.server.bp.db.db_height)
claim_D, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_D}")
self.assertEqual(1132, self.conductor.spv_node.server.db.db_height)
claim_D, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_D}")
self.assertEqual(False, claim_D.is_controlling)
self.assertEqual(801, claim_D.last_takeover_height)
self.assertEqual(1142, claim_D.activation_height)
@ -1053,8 +1053,8 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# Block 1133: Claim C activates. It has 50 LBC, while claim A has 24 LBC, so a takeover is initiated. The takeover height for this name is set to 1133, and therefore the activation delay for all the claims becomes min(4032, floor((1133-1133) / 32)) = 0. All the claims become active. The totals for each claim are recalculated, and claim D becomes controlling because it has the highest total.
# State: A(10+14) is active, B(20) is active, C(50) is active, D(300) is controlling
await self.generate(1)
self.assertEqual(1133, self.conductor.spv_node.server.bp.db.db_height)
claim_D, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_D}")
self.assertEqual(1133, self.conductor.spv_node.server.db.db_height)
claim_D, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_D}")
self.assertEqual(True, claim_D.is_controlling)
self.assertEqual(1133, claim_D.last_takeover_height)
self.assertEqual(1133, claim_D.activation_height)
@ -1407,12 +1407,12 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id']
await self.assertNoClaim(second_claim_id)
self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1
len((await self.conductor.spv_node.server.session_manager.search_index.search(claim_name=name))[0]), 1
)
await self.generate(1)
await self.assertMatchClaim(second_claim_id)
self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2
len((await self.conductor.spv_node.server.session_manager.search_index.search(claim_name=name))[0]), 2
)
async def test_abandon_controlling_same_block_as_new_claim(self):
@ -1428,7 +1428,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
async def test_trending(self):
async def get_trending_score(claim_id):
return (await self.conductor.spv_node.server.bp.db.search_index.search(
return (await self.conductor.spv_node.server.session_manager.search_index.search(
claim_id=claim_id
))[0][0]['trending_score']
@ -1456,7 +1456,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
)
await self.generate(1)
self.assertEqual(-174.951347102643, await get_trending_score(claim_id1))
search_results = (await self.conductor.spv_node.server.bp.db.search_index.search(claim_name="derp"))[0]
search_results = (await self.conductor.spv_node.server.session_manager.search_index.search(claim_name="derp"))[0]
self.assertEqual(1, len(search_results))
self.assertListEqual([claim_id1], [c['claim_id'] for c in search_results])
@ -1465,22 +1465,31 @@ class ResolveAfterReorg(BaseResolveTestCase):
async def reorg(self, start):
blocks = self.ledger.headers.height - start
self.blockchain.block_expected = start - 1
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
self.conductor.spv_node.server.synchronized.clear()
# go back to start
await self.blockchain.invalidate_block((await self.ledger.headers.hash(start)).decode())
print('invalidate block', await self.blockchain.invalidate_block((await self.ledger.headers.hash(start)).decode()))
# go to previous + 1
await self.generate(blocks + 2)
await self.blockchain.generate(blocks + 2)
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
await self.conductor.spv_node.server.synchronized.wait()
# await asyncio.wait_for(self.on_header(self.blockchain.block_expected), 30.0)
async def assertBlockHash(self, height):
bp = self.conductor.spv_node.server.bp
reader_db = self.conductor.spv_node.server.db
block_hash = await self.blockchain.get_block_hash(height)
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
self.assertEqual(block_hash, (await reader_db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = [
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
tx_hash[::-1].hex() for tx_hash in reader_db.get_block_txs(height)
]
txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
txs = await reader_db.get_transactions_and_merkles(txids)
block_txs = (await self.conductor.spv_node.server.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
@ -1491,9 +1500,18 @@ class ResolveAfterReorg(BaseResolveTestCase):
channel_id = self.get_claim_id(
await self.channel_create(channel_name, '0.01')
)
self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex())
await self.assertNameState(
height=207, name='@abc', winning_claim_id=channel_id, last_takeover_height=207,
non_winning_claims=[]
)
await self.reorg(206)
self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex())
await self.assertNameState(
height=208, name='@abc', winning_claim_id=channel_id, last_takeover_height=207,
non_winning_claims=[]
)
# await self.assertNoClaimForName(channel_name)
# self.assertNotIn('error', await self.resolve(channel_name))
@ -1502,16 +1520,29 @@ class ResolveAfterReorg(BaseResolveTestCase):
stream_id = self.get_claim_id(
await self.stream_create(stream_name, '0.01', channel_id=channel_id)
)
self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex())
await self.assertNameState(
height=209, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209,
non_winning_claims=[]
)
await self.reorg(206)
self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex())
await self.assertNameState(
height=210, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209,
non_winning_claims=[]
)
await self.support_create(stream_id, '0.01')
self.assertNotIn('error', await self.resolve(stream_name))
self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex())
await self.assertNameState(
height=211, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209,
non_winning_claims=[]
)
await self.reorg(206)
# self.assertNotIn('error', await self.resolve(stream_name))
self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex())
await self.assertNameState(
height=212, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209,
non_winning_claims=[]
)
await self.stream_abandon(stream_id)
self.assertNotIn('error', await self.resolve(channel_name))
@ -1553,7 +1584,6 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.ledger.wait(broadcast_tx)
await self.support_create(still_valid.outputs[0].claim_id, '0.01')
# await self.generate(1)
await self.ledger.wait(broadcast_tx, self.blockchain.block_expected)
self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(208)
@ -1574,7 +1604,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0)
await asyncio.wait_for(self.on_header(209), 30.0)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
await self.assertBlockHash(209)
@ -1603,7 +1633,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(1)
# wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0)
await asyncio.wait_for(self.on_header(210), 30.0)
# verify the claim is in the new block and that it is returned by claim_search
republished = await self.resolve('hovercraft')
@ -1653,7 +1683,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0)
await asyncio.wait_for(self.on_header(209), 30.0)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
await self.assertBlockHash(209)
@ -1682,7 +1712,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(1)
# wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0)
await asyncio.wait_for(self.on_header(210), 10.0)
# verify the claim is in the new block and that it is returned by claim_search
republished = await self.resolve('hovercraft')

View file

@ -23,7 +23,7 @@ class BasicTransactionTest(IntegrationTestCase):
))
sendtxid1 = await self.blockchain.send_to_address(address1, 5)
sendtxid2 = await self.blockchain.send_to_address(address2, 5)
await self.blockchain.generate(1)
await self.generate(1)
await notifications
self.assertEqual(d2l(await self.account.get_balance()), '10.0')
@ -57,7 +57,7 @@ class BasicTransactionTest(IntegrationTestCase):
notifications = asyncio.create_task(asyncio.wait(
[asyncio.ensure_future(self.ledger.wait(channel_tx)), asyncio.ensure_future(self.ledger.wait(stream_tx))]
))
await self.blockchain.generate(1)
await self.generate(1)
await notifications
self.assertEqual(d2l(await self.account.get_balance()), '7.985786')
self.assertEqual(d2l(await self.account.get_balance(include_claims=True)), '9.985786')
@ -70,7 +70,7 @@ class BasicTransactionTest(IntegrationTestCase):
await self.broadcast(abandon_tx)
await notify
notify = asyncio.create_task(self.ledger.wait(abandon_tx))
await self.blockchain.generate(1)
await self.generate(1)
await notify
response = await self.ledger.resolve([], ['lbry://@bar/foo'])

View file

@ -9,9 +9,8 @@ from lbry.wallet.manager import WalletManager
class BasicTransactionTests(IntegrationTestCase):
async def test_variety_of_transactions_and_longish_history(self):
await self.blockchain.generate(300)
await self.generate(300)
await self.assertBalance(self.account, '0.0')
addresses = await self.account.receiving.get_addresses()
@ -57,7 +56,7 @@ class BasicTransactionTests(IntegrationTestCase):
for tx in await self.ledger.db.get_transactions(txid__in=[tx.id for tx in txs])
]))
await self.blockchain.generate(1)
await self.generate(1)
await asyncio.wait([self.ledger.wait(tx) for tx in txs])
await self.assertBalance(self.account, '199.99876')
@ -74,7 +73,7 @@ class BasicTransactionTests(IntegrationTestCase):
)
await self.broadcast(tx)
await self.ledger.wait(tx)
await self.blockchain.generate(1)
await self.generate(1)
await self.ledger.wait(tx)
self.assertEqual(2, await self.account.get_utxo_count()) # 199 + change
@ -92,7 +91,7 @@ class BasicTransactionTests(IntegrationTestCase):
self.blockchain.send_to_address(address, 1.1) for address in addresses[:5]
))
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # mempool
await self.blockchain.generate(1)
await self.generate(1)
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # confirmed
await self.assertBalance(account1, '5.5')
await self.assertBalance(account2, '0.0')
@ -107,7 +106,7 @@ class BasicTransactionTests(IntegrationTestCase):
)
await self.broadcast(tx)
await self.ledger.wait(tx) # mempool
await self.blockchain.generate(1)
await self.generate(1)
await self.ledger.wait(tx) # confirmed
await self.assertBalance(account1, '3.499802')
@ -121,7 +120,7 @@ class BasicTransactionTests(IntegrationTestCase):
)
await self.broadcast(tx)
await self.ledger.wait(tx) # mempool
await self.blockchain.generate(1)
await self.generate(1)
await self.ledger.wait(tx) # confirmed
tx = (await account1.get_transactions(include_is_my_input=True, include_is_my_output=True))[1]
@ -133,11 +132,11 @@ class BasicTransactionTests(IntegrationTestCase):
self.assertTrue(tx.outputs[1].is_internal_transfer)
async def test_history_edge_cases(self):
await self.blockchain.generate(300)
await self.generate(300)
await self.assertBalance(self.account, '0.0')
address = await self.account.receiving.get_or_create_usable_address()
# evil trick: mempool is unsorted on real life, but same order between python instances. reproduce it
original_summary = self.conductor.spv_node.server.bp.mempool.transaction_summaries
original_summary = self.conductor.spv_node.server.mempool.transaction_summaries
def random_summary(*args, **kwargs):
summary = original_summary(*args, **kwargs)
@ -146,7 +145,7 @@ class BasicTransactionTests(IntegrationTestCase):
while summary == ordered:
random.shuffle(summary)
return summary
self.conductor.spv_node.server.bp.mempool.transaction_summaries = random_summary
self.conductor.spv_node.server.mempool.transaction_summaries = random_summary
# 10 unconfirmed txs, all from blockchain wallet
sends = [self.blockchain.send_to_address(address, 10) for _ in range(10)]
# use batching to reduce issues with send_to_address on cli
@ -199,7 +198,7 @@ class BasicTransactionTests(IntegrationTestCase):
async def test_sqlite_coin_chooser(self):
wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger})
await self.blockchain.generate(300)
await self.generate(300)
await self.assertBalance(self.account, '0.0')
address = await self.account.receiving.get_or_create_usable_address()

View file

@ -126,28 +126,28 @@ class TestRevertablePrefixDB(unittest.TestCase):
self.assertIsNone(self.db.claim_takeover.get(name))
self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height)
self.db.commit(10000000)
self.db.commit(10000000, b'\x00' * 32)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
self.db.claim_takeover.stage_delete((name,), (claim_hash1, takeover_height))
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 1))
self.db.claim_takeover.stage_delete((name,), (claim_hash2, takeover_height + 1))
self.db.commit(10000001)
self.db.commit(10000001, b'\x01' * 32)
self.assertIsNone(self.db.claim_takeover.get(name))
self.db.claim_takeover.stage_put((name,), (claim_hash3, takeover_height + 2))
self.db.commit(10000002)
self.db.commit(10000002, b'\x02' * 32)
self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
self.db.claim_takeover.stage_delete((name,), (claim_hash3, takeover_height + 2))
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 3))
self.db.commit(10000003)
self.db.commit(10000003, b'\x03' * 32)
self.assertEqual(10000003, self.db.claim_takeover.get(name).height)
self.db.rollback(10000003)
self.db.rollback(10000003, b'\x03' * 32)
self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
self.db.rollback(10000002)
self.db.rollback(10000002, b'\x02' * 32)
self.assertIsNone(self.db.claim_takeover.get(name))
self.db.rollback(10000001)
self.db.rollback(10000001, b'\x01' * 32)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
self.db.rollback(10000000)
self.db.rollback(10000000, b'\x00' * 32)
self.assertIsNone(self.db.claim_takeover.get(name))