This commit is contained in:
Jack Robison 2022-01-12 11:59:44 -05:00
parent c0ce27ccf3
commit 98f8fd0556
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
13 changed files with 180 additions and 146 deletions

View file

@ -285,6 +285,17 @@ class IntegrationTestCase(AsyncioTestCase):
lambda e: e.tx.id == tx.id and e.address == address lambda e: e.tx.id == tx.id and e.address == address
) )
async def generate(self, blocks):
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
print("generate", blocks)
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
self.conductor.spv_node.server.synchronized.clear()
await self.blockchain.generate(blocks)
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
print("wait for synchronized")
await self.conductor.spv_node.server.synchronized.wait()
print("finished waiting for synchronized")
class FakeExchangeRateManager(ExchangeRateManager): class FakeExchangeRateManager(ExchangeRateManager):
@ -456,8 +467,7 @@ class CommandTestCase(IntegrationTestCase):
async def confirm_tx(self, txid, ledger=None): async def confirm_tx(self, txid, ledger=None):
""" Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """ """ Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """
await self.on_transaction_id(txid, ledger) await self.on_transaction_id(txid, ledger)
await self.generate(1) await asyncio.wait([self.generate(1), self.on_transaction_id(txid, ledger)], timeout=5)
await self.on_transaction_id(txid, ledger)
return txid return txid
async def on_transaction_dict(self, tx): async def on_transaction_dict(self, tx):
@ -472,12 +482,6 @@ class CommandTestCase(IntegrationTestCase):
addresses.add(txo['address']) addresses.add(txo['address'])
return list(addresses) return list(addresses)
async def generate(self, blocks):
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
await self.blockchain.generate(blocks)
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True): async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True):
txid = await self.blockchain._cli_cmnd('claimname', name, value, amount) txid = await self.blockchain._cli_cmnd('claimname', name, value, amount)
if confirm: if confirm:

View file

@ -1,5 +1,5 @@
__hub_url__ = ( __hub_url__ = (
"https://github.com/lbryio/hub/releases/download/v0.2022.01.21.1/hub" "https://github.com/lbryio/hub/releases/download/v0.2022.01.21.1/hub"
) )
from .node import Conductor from lbry.wallet.orchstr8.node import Conductor
from .service import ConductorService from lbry.wallet.orchstr8.service import ConductorService

View file

@ -16,11 +16,13 @@ import urllib.request
from uuid import uuid4 from uuid import uuid4
import lbry import lbry
from lbry.wallet.server.server import Server
from lbry.wallet.server.env import Env from lbry.wallet.server.env import Env
from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent
from lbry.conf import KnownHubsList, Config from lbry.conf import KnownHubsList, Config
from lbry.wallet.orchstr8 import __hub_url__ from lbry.wallet.orchstr8 import __hub_url__
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.chain_reader import BlockchainReaderServer
from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -189,49 +191,48 @@ class SPVNode:
self.coin_class = coin_class self.coin_class = coin_class
self.controller = None self.controller = None
self.data_path = None self.data_path = None
self.server = None self.server: Optional[BlockchainReaderServer] = None
self.writer: Optional[BlockProcessor] = None
self.es_writer: Optional[ElasticWriter] = None
self.hostname = 'localhost' self.hostname = 'localhost'
self.port = 50001 + node_number # avoid conflict with default daemon self.port = 50001 + node_number # avoid conflict with default daemon
self.udp_port = self.port self.udp_port = self.port
self.session_timeout = 600 self.session_timeout = 600
self.rpc_port = '0' # disabled by default
self.stopped = False self.stopped = False
self.index_name = uuid4().hex self.index_name = uuid4().hex
async def start(self, blockchain_node: 'BlockchainNode', extraconf=None): async def start(self, blockchain_node: 'BlockchainNode', extraconf=None):
self.data_path = tempfile.mkdtemp() self.data_path = tempfile.mkdtemp()
conf = { conf = {
'DESCRIPTION': '', 'description': '',
'PAYMENT_ADDRESS': '', 'payment_address': '',
'DAILY_FEE': '0', 'daily_fee': '0',
'DB_DIRECTORY': self.data_path, 'db_dir': self.data_path,
'DAEMON_URL': blockchain_node.rpc_url, 'daemon_url': blockchain_node.rpc_url,
'REORG_LIMIT': '100', 'reorg_limit': 100,
'HOST': self.hostname, 'host': self.hostname,
'TCP_PORT': str(self.port), 'tcp_port': self.port,
'UDP_PORT': str(self.udp_port), 'udp_port': self.udp_port,
'SESSION_TIMEOUT': str(self.session_timeout), 'session_timeout': self.session_timeout,
'MAX_QUERY_WORKERS': '0', 'max_query_workers': 0,
'INDIVIDUAL_TAG_INDEXES': '', 'es_index_prefix': self.index_name,
'RPC_PORT': self.rpc_port,
'ES_INDEX_PREFIX': self.index_name,
'ES_MODE': 'writer',
} }
if extraconf: if extraconf:
conf.update(extraconf) conf.update(extraconf)
# TODO: don't use os.environ self.writer = BlockProcessor(Env(self.coin_class, es_mode='writer', **conf))
os.environ.update(conf) self.server = BlockchainReaderServer(Env(self.coin_class, es_mode='reader', **conf))
self.server = Server(Env(self.coin_class)) self.es_writer = ElasticWriter(Env(self.coin_class, es_mode='reader', **conf))
self.server.bp.mempool.refresh_secs = self.server.bp.prefetcher.polling_delay = 0.5 await self.writer.open()
await self.server.start() await self.writer.start()
await asyncio.wait([self.server.start(), self.es_writer.start()])
async def stop(self, cleanup=True): async def stop(self, cleanup=True):
if self.stopped: if self.stopped:
return return
try: try:
await self.server.db.search_index.delete_index() await self.es_writer.stop(delete_index=True)
await self.server.db.search_index.stop()
await self.server.stop() await self.server.stop()
await self.writer.stop()
self.stopped = True self.stopped = True
finally: finally:
cleanup and self.cleanup() cleanup and self.cleanup()

View file

@ -1,12 +1,12 @@
#!/bin/bash #!/bin/bash
SNAPSHOT_HEIGHT="1049658" SNAPSHOT_HEIGHT="1072108"
HUB_VOLUME_PATH="/var/lib/docker/volumes/${USER}_wallet_server" HUB_VOLUME_PATH="/var/lib/docker/volumes/${USER}_wallet_server"
ES_VOLUME_PATH="/var/lib/docker/volumes/${USER}_es01" ES_VOLUME_PATH="/var/lib/docker/volumes/${USER}_es01"
SNAPSHOT_TAR_NAME="wallet_server_snapshot_${SNAPSHOT_HEIGHT}.tar" SNAPSHOT_TAR_NAME="wallet_server_snapshot_${SNAPSHOT_HEIGHT}.tar.gz"
ES_SNAPSHOT_TAR_NAME="es_snapshot_${SNAPSHOT_HEIGHT}.tar" ES_SNAPSHOT_TAR_NAME="es_snapshot_${SNAPSHOT_HEIGHT}.tar.gz"
SNAPSHOT_URL="https://snapshots.lbry.com/hub/${SNAPSHOT_TAR_NAME}" SNAPSHOT_URL="https://snapshots.lbry.com/hub/${SNAPSHOT_TAR_NAME}"
ES_SNAPSHOT_URL="https://snapshots.lbry.com/hub/${ES_SNAPSHOT_TAR_NAME}" ES_SNAPSHOT_URL="https://snapshots.lbry.com/hub/${ES_SNAPSHOT_TAR_NAME}"

View file

@ -36,7 +36,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(self.ledger.headers.height, height) self.assertEqual(self.ledger.headers.height, height)
await self.assertBlockHash(height) await self.assertBlockHash(height)
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(2) await self.generate(2)
await self.ledger.on_header.where(lambda e: e.height == 207) await self.ledger.on_header.where(lambda e: e.height == 207)
self.assertEqual(self.ledger.headers.height, 207) self.assertEqual(self.ledger.headers.height, 207)
await self.assertBlockHash(206) await self.assertBlockHash(206)
@ -45,14 +45,14 @@ class BlockchainReorganizationTests(CommandTestCase):
# invalidate current block, move forward 3 # invalidate current block, move forward 3
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(3) await self.generate(3)
await self.ledger.on_header.where(lambda e: e.height == 208) await self.ledger.on_header.where(lambda e: e.height == 208)
self.assertEqual(self.ledger.headers.height, 208) self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(206) await self.assertBlockHash(206)
await self.assertBlockHash(207) await self.assertBlockHash(207)
await self.assertBlockHash(208) await self.assertBlockHash(208)
self.assertEqual(2, bp.reorg_count_metric._samples()[0][2]) self.assertEqual(2, bp.reorg_count_metric._samples()[0][2])
await self.blockchain.generate(3) await self.generate(3)
await self.ledger.on_header.where(lambda e: e.height == 211) await self.ledger.on_header.where(lambda e: e.height == 211)
await self.assertBlockHash(209) await self.assertBlockHash(209)
await self.assertBlockHash(210) await self.assertBlockHash(210)
@ -61,7 +61,7 @@ class BlockchainReorganizationTests(CommandTestCase):
'still-valid', '1.0', file_path=self.create_upload_file(data=b'hi!') 'still-valid', '1.0', file_path=self.create_upload_file(data=b'hi!')
) )
await self.ledger.wait(still_valid) await self.ledger.wait(still_valid)
await self.blockchain.generate(1) await self.generate(1)
await self.ledger.on_header.where(lambda e: e.height == 212) await self.ledger.on_header.where(lambda e: e.height == 212)
claim_id = still_valid.outputs[0].claim_id claim_id = still_valid.outputs[0].claim_id
c1 = (await self.resolve(f'still-valid#{claim_id}'))['claim_id'] c1 = (await self.resolve(f'still-valid#{claim_id}'))['claim_id']
@ -70,7 +70,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertTrue(c1 == c2 == c3) self.assertTrue(c1 == c2 == c3)
abandon_tx = await self.daemon.jsonrpc_stream_abandon(claim_id=claim_id) abandon_tx = await self.daemon.jsonrpc_stream_abandon(claim_id=claim_id)
await self.blockchain.generate(1) await self.generate(1)
await self.ledger.on_header.where(lambda e: e.height == 213) await self.ledger.on_header.where(lambda e: e.height == 213)
c1 = await self.resolve(f'still-valid#{still_valid.outputs[0].claim_id}') c1 = await self.resolve(f'still-valid#{still_valid.outputs[0].claim_id}')
c2 = await self.daemon.jsonrpc_resolve([f'still-valid#{claim_id[:2]}']) c2 = await self.daemon.jsonrpc_resolve([f'still-valid#{claim_id[:2]}'])
@ -113,7 +113,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.blockchain.clear_mempool()
await self.blockchain.generate(2) await self.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0) await asyncio.wait_for(self.on_header(209), 3.0)
@ -142,7 +142,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# broadcast the claim in a different block # broadcast the claim in a different block
new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode()) new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode())
self.assertEqual(broadcast_tx.id, new_txid) self.assertEqual(broadcast_tx.id, new_txid)
await self.blockchain.generate(1) await self.generate(1)
# wait for the client to catch up # wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0) await asyncio.wait_for(self.on_header(210), 1.0)
@ -192,7 +192,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool() await self.blockchain.clear_mempool()
await self.blockchain.generate(2) await self.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0) await asyncio.wait_for(self.on_header(209), 3.0)
@ -221,7 +221,7 @@ class BlockchainReorganizationTests(CommandTestCase):
# broadcast the claim in a different block # broadcast the claim in a different block
new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode()) new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode())
self.assertEqual(broadcast_tx.id, new_txid) self.assertEqual(broadcast_tx.id, new_txid)
await self.blockchain.generate(1) await self.generate(1)
# wait for the client to catch up # wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0) await asyncio.wait_for(self.on_header(210), 1.0)

View file

@ -16,7 +16,7 @@ class NetworkTests(IntegrationTestCase):
async def test_remote_height_updated_automagically(self): async def test_remote_height_updated_automagically(self):
initial_height = self.ledger.network.remote_height initial_height = self.ledger.network.remote_height
await self.blockchain.generate(1) await self.generate(1)
await self.ledger.network.on_header.first await self.ledger.network.on_header.first
self.assertEqual(self.ledger.network.remote_height, initial_height + 1) self.assertEqual(self.ledger.network.remote_height, initial_height + 1)
@ -85,8 +85,8 @@ class ReconnectTests(IntegrationTestCase):
async def test_direct_sync(self): async def test_direct_sync(self):
await self.ledger.stop() await self.ledger.stop()
initial_height = self.ledger.local_height_including_downloaded_height initial_height = self.ledger.local_height_including_downloaded_height
await self.blockchain.generate(100) await self.generate(100)
while self.conductor.spv_node.server.session_mgr.notified_height < initial_height + 99: # off by 1 while self.conductor.spv_node.server.session_manager.notified_height < initial_height + 99: # off by 1
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
self.assertEqual(initial_height, self.ledger.local_height_including_downloaded_height) self.assertEqual(initial_height, self.ledger.local_height_including_downloaded_height)
await self.ledger.headers.open() await self.ledger.headers.open()
@ -105,7 +105,7 @@ class ReconnectTests(IntegrationTestCase):
# await self.ledger.resolve([], 'derp') # await self.ledger.resolve([], 'derp')
# self.assertTrue(self.ledger.network.is_connected) # self.assertTrue(self.ledger.network.is_connected)
await asyncio.wait_for(self.on_transaction_id(sendtxid), 10.0) # mempool await asyncio.wait_for(self.on_transaction_id(sendtxid), 10.0) # mempool
await self.blockchain.generate(1) await self.generate(1)
await self.on_transaction_id(sendtxid) # confirmed await self.on_transaction_id(sendtxid) # confirmed
self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine
@ -123,7 +123,7 @@ class ReconnectTests(IntegrationTestCase):
await self.ledger.network.get_transaction(sendtxid) await self.ledger.network.get_transaction(sendtxid)
# * goes to pick some water outside... * time passes by and another donation comes in # * goes to pick some water outside... * time passes by and another donation comes in
sendtxid = await self.blockchain.send_to_address(address1, 42) sendtxid = await self.blockchain.send_to_address(address1, 42)
await self.blockchain.generate(1) await self.generate(1)
# (this is just so the test doesn't hang forever if it doesn't reconnect) # (this is just so the test doesn't hang forever if it doesn't reconnect)
if not self.ledger.network.is_connected: if not self.ledger.network.is_connected:
await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=10.0) await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=10.0)
@ -169,7 +169,7 @@ class UDPServerFailDiscoveryTest(AsyncioTestCase):
self.addCleanup(conductor.stop_blockchain) self.addCleanup(conductor.stop_blockchain)
await conductor.start_spv() await conductor.start_spv()
self.addCleanup(conductor.stop_spv) self.addCleanup(conductor.stop_spv)
self.assertFalse(conductor.spv_node.server.bp.status_server.is_running) self.assertFalse(conductor.spv_node.server.reader.status_server.is_running)
await asyncio.wait_for(conductor.start_wallet(), timeout=5) await asyncio.wait_for(conductor.start_wallet(), timeout=5)
self.addCleanup(conductor.stop_wallet) self.addCleanup(conductor.stop_wallet)
self.assertTrue(conductor.wallet_node.ledger.network.is_connected) self.assertTrue(conductor.wallet_node.ledger.network.is_connected)

View file

@ -63,7 +63,7 @@ class SyncTests(IntegrationTestCase):
await self.assertBalance(account1, '1.0') await self.assertBalance(account1, '1.0')
await self.assertBalance(account2, '1.0') await self.assertBalance(account2, '1.0')
await self.blockchain.generate(1) await self.generate(1)
# pay 0.01 from main node to receiving node, would have increased change addresses # pay 0.01 from main node to receiving node, would have increased change addresses
address0 = (await account0.receiving.get_addresses())[0] address0 = (await account0.receiving.get_addresses())[0]
@ -79,7 +79,7 @@ class SyncTests(IntegrationTestCase):
account1.ledger.wait(tx), account1.ledger.wait(tx),
account2.ledger.wait(tx), account2.ledger.wait(tx),
]) ])
await self.blockchain.generate(1) await self.generate(1)
await asyncio.wait([ await asyncio.wait([
account0.ledger.wait(tx), account0.ledger.wait(tx),
account1.ledger.wait(tx), account1.ledger.wait(tx),
@ -92,7 +92,7 @@ class SyncTests(IntegrationTestCase):
await self.assertBalance(account1, '0.989876') await self.assertBalance(account1, '0.989876')
await self.assertBalance(account2, '0.989876') await self.assertBalance(account2, '0.989876')
await self.blockchain.generate(1) await self.generate(1)
# create a new mirror node and see if it syncs to same balance from scratch # create a new mirror node and see if it syncs to same balance from scratch
node3 = await self.make_wallet_node(account1.seed) node3 = await self.make_wallet_node(account1.seed)

View file

@ -11,7 +11,7 @@ from lbry.wallet.dewies import dict_values_to_lbc
class WalletCommands(CommandTestCase): class WalletCommands(CommandTestCase):
async def test_wallet_create_and_add_subscribe(self): async def test_wallet_create_and_add_subscribe(self):
session = next(iter(self.conductor.spv_node.server.session_mgr.sessions.values())) session = next(iter(self.conductor.spv_node.server.session_manager.sessions.values()))
self.assertEqual(len(session.hashX_subs), 27) self.assertEqual(len(session.hashX_subs), 27)
wallet = await self.daemon.jsonrpc_wallet_create('foo', create_account=True, single_key=True) wallet = await self.daemon.jsonrpc_wallet_create('foo', create_account=True, single_key=True)
self.assertEqual(len(session.hashX_subs), 28) self.assertEqual(len(session.hashX_subs), 28)

View file

@ -25,13 +25,13 @@ class TestSessions(IntegrationTestCase):
) )
await session.create_connection() await session.create_connection()
await session.send_request('server.banner', ()) await session.send_request('server.banner', ())
self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1) self.assertEqual(len(self.conductor.spv_node.server.session_manager.sessions), 1)
self.assertFalse(session.is_closing()) self.assertFalse(session.is_closing())
await asyncio.sleep(1.1) await asyncio.sleep(1.1)
with self.assertRaises(asyncio.TimeoutError): with self.assertRaises(asyncio.TimeoutError):
await session.send_request('server.banner', ()) await session.send_request('server.banner', ())
self.assertTrue(session.is_closing()) self.assertTrue(session.is_closing())
self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0) self.assertEqual(len(self.conductor.spv_node.server.session_manager.sessions), 0)
async def test_proper_version(self): async def test_proper_version(self):
info = await self.ledger.network.get_server_features() info = await self.ledger.network.get_server_features()
@ -186,7 +186,7 @@ class TestHubDiscovery(CommandTestCase):
self.daemon.ledger.network.client.server_address_and_port, ('127.0.0.1', kp_final_node.port) self.daemon.ledger.network.client.server_address_and_port, ('127.0.0.1', kp_final_node.port)
) )
kp_final_node.server.session_mgr._notify_peer('127.0.0.1:9988') kp_final_node.server.session_manager._notify_peer('127.0.0.1:9988')
await self.daemon.ledger.network.on_hub.first await self.daemon.ledger.network.on_hub.first
await asyncio.sleep(0.5) # wait for above event to be processed by other listeners await asyncio.sleep(0.5) # wait for above event to be processed by other listeners
self.assertEqual( self.assertEqual(

View file

@ -23,7 +23,7 @@ class BaseResolveTestCase(CommandTestCase):
def assertMatchESClaim(self, claim_from_es, claim_from_db): def assertMatchESClaim(self, claim_from_es, claim_from_db):
self.assertEqual(claim_from_es['claim_hash'][::-1].hex(), claim_from_db.claim_hash.hex()) self.assertEqual(claim_from_es['claim_hash'][::-1].hex(), claim_from_db.claim_hash.hex())
self.assertEqual(claim_from_es['claim_id'], claim_from_db.claim_hash.hex()) self.assertEqual(claim_from_es['claim_id'], claim_from_db.claim_hash.hex())
self.assertEqual(claim_from_es['activation_height'], claim_from_db.activation_height) self.assertEqual(claim_from_es['activation_height'], claim_from_db.activation_height, f"es height: {claim_from_es['activation_height']}, rocksdb height: {claim_from_db.activation_height}")
self.assertEqual(claim_from_es['last_take_over_height'], claim_from_db.last_takeover_height) self.assertEqual(claim_from_es['last_take_over_height'], claim_from_db.last_takeover_height)
self.assertEqual(claim_from_es['tx_id'], claim_from_db.tx_hash[::-1].hex()) self.assertEqual(claim_from_es['tx_id'], claim_from_db.tx_hash[::-1].hex())
self.assertEqual(claim_from_es['tx_nout'], claim_from_db.position) self.assertEqual(claim_from_es['tx_nout'], claim_from_db.position)
@ -44,44 +44,44 @@ class BaseResolveTestCase(CommandTestCase):
if claim_id is None: if claim_id is None:
self.assertIn('error', other) self.assertIn('error', other)
self.assertEqual(other['error']['name'], 'NOT_FOUND') self.assertEqual(other['error']['name'], 'NOT_FOUND')
claims_from_es = (await self.conductor.spv_node.server.bp.db.search_index.search(name=name))[0] claims_from_es = (await self.conductor.spv_node.server.session_manager.search_index.search(name=name))[0]
claims_from_es = [c['claim_hash'][::-1].hex() for c in claims_from_es] claims_from_es = [c['claim_hash'][::-1].hex() for c in claims_from_es]
self.assertNotIn(claim_id, claims_from_es) self.assertNotIn(claim_id, claims_from_es)
else: else:
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id) claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(claim_id=claim_id)
self.assertEqual(claim_id, other['claim_id']) self.assertEqual(claim_id, other['claim_id'])
self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex()) self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex())
async def assertNoClaimForName(self, name: str): async def assertNoClaimForName(self, name: str):
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) stream, channel, _, _ = await self.conductor.spv_node.server.db.resolve(name)
self.assertNotIn('claimId', lbrycrd_winning) self.assertNotIn('claimId', lbrycrd_winning)
if stream is not None: if stream is not None:
self.assertIsInstance(stream, LookupError) self.assertIsInstance(stream, LookupError)
else: else:
self.assertIsInstance(channel, LookupError) self.assertIsInstance(channel, LookupError)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name) claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(name=name)
self.assertListEqual([], claim_from_es[0]) self.assertListEqual([], claim_from_es[0])
async def assertNoClaim(self, claim_id: str): async def assertNoClaim(self, claim_id: str):
self.assertDictEqual( self.assertDictEqual(
{}, json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id)) {}, json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
) )
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id) claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(claim_id=claim_id)
self.assertListEqual([], claim_from_es[0]) self.assertListEqual([], claim_from_es[0])
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) claim = await self.conductor.spv_node.server.db.fs_getclaimbyid(claim_id)
self.assertIsNone(claim) self.assertIsNone(claim)
async def assertMatchWinningClaim(self, name): async def assertMatchWinningClaim(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) stream, channel, _, _ = await self.conductor.spv_node.server.db.resolve(name)
claim = stream if stream else channel claim = stream if stream else channel
await self._assertMatchClaim(expected, claim) await self._assertMatchClaim(expected, claim)
return claim return claim
async def _assertMatchClaim(self, expected, claim): async def _assertMatchClaim(self, expected, claim):
self.assertMatchDBClaim(expected, claim) self.assertMatchDBClaim(expected, claim)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(
claim_id=claim.claim_hash.hex() claim_id=claim.claim_hash.hex()
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
@ -90,7 +90,7 @@ class BaseResolveTestCase(CommandTestCase):
async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True): async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id)) expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id))
claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) claim = await self.conductor.spv_node.server.db.fs_getclaimbyid(claim_id)
if is_active_in_lbrycrd: if is_active_in_lbrycrd:
if not expected: if not expected:
self.assertIsNone(claim) self.assertIsNone(claim)
@ -98,7 +98,7 @@ class BaseResolveTestCase(CommandTestCase):
self.assertMatchDBClaim(expected, claim) self.assertMatchDBClaim(expected, claim)
else: else:
self.assertDictEqual({}, expected) self.assertDictEqual({}, expected)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(
claim_id=claim.claim_hash.hex() claim_id=claim.claim_hash.hex()
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
@ -116,7 +116,7 @@ class BaseResolveTestCase(CommandTestCase):
def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount, is_active_in_lbrycrd=True): def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount, is_active_in_lbrycrd=True):
total_amount = 0 total_amount = 0
db = self.conductor.spv_node.server.bp.db db = self.conductor.spv_node.server.db
for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))): for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))):
total_amount += amount total_amount += amount
@ -131,7 +131,7 @@ class BaseResolveTestCase(CommandTestCase):
async def assertMatchClaimsForName(self, name): async def assertMatchClaimsForName(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name)) expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name))
db = self.conductor.spv_node.server.bp.db db = self.conductor.spv_node.server.db
# self.assertEqual(len(expected['claims']), len(db_claims.claims)) # self.assertEqual(len(expected['claims']), len(db_claims.claims))
# self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight) # self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight)
last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight'] last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight']
@ -143,7 +143,7 @@ class BaseResolveTestCase(CommandTestCase):
claim = db._fs_get_claim_by_hash(claim_hash) claim = db._fs_get_claim_by_hash(claim_hash)
self.assertMatchDBClaim(c, claim) self.assertMatchDBClaim(c, claim)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(
claim_id=c['claimId'] claim_id=c['claimId']
) )
self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(len(claim_from_es[0]), 1)
@ -151,6 +151,17 @@ class BaseResolveTestCase(CommandTestCase):
self.assertMatchESClaim(claim_from_es[0][0], claim) self.assertMatchESClaim(claim_from_es[0][0], claim)
self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount']) self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount'])
async def assertNameState(self, height: int, name: str, winning_claim_id: str, last_takeover_height: int,
non_winning_claims: List[ClaimStateValue]):
self.assertEqual(height, self.conductor.spv_node.server.db.db_height)
await self.assertMatchClaimIsWinning(name, winning_claim_id)
for non_winning in non_winning_claims:
claim = await self.assertMatchClaim(
non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd
)
self.assertEqual(non_winning.activation_height, claim.activation_height)
self.assertEqual(last_takeover_height, claim.last_takeover_height)
class ResolveCommand(BaseResolveTestCase): class ResolveCommand(BaseResolveTestCase):
async def test_colliding_short_id(self): async def test_colliding_short_id(self):
@ -641,17 +652,6 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
async def create_stream_claim(self, amount: str, name='derp') -> str: async def create_stream_claim(self, amount: str, name='derp') -> str:
return (await self.stream_create(name, amount, allow_duplicate_name=True))['outputs'][0]['claim_id'] return (await self.stream_create(name, amount, allow_duplicate_name=True))['outputs'][0]['claim_id']
async def assertNameState(self, height: int, name: str, winning_claim_id: str, last_takeover_height: int,
non_winning_claims: List[ClaimStateValue]):
self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height)
await self.assertMatchClaimIsWinning(name, winning_claim_id)
for non_winning in non_winning_claims:
claim = await self.assertMatchClaim(
non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd
)
self.assertEqual(non_winning.activation_height, claim.activation_height)
self.assertEqual(last_takeover_height, claim.last_takeover_height)
async def test_delay_takeover_with_update(self): async def test_delay_takeover_with_update(self):
name = 'derp' name = 'derp'
first_claim_id = await self.create_stream_claim('0.2', name) first_claim_id = await self.create_stream_claim('0.2', name)
@ -961,7 +961,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
) )
greater_than_or_equal_to_zero = [ greater_than_or_equal_to_zero = [
claim['claim_id'] for claim in ( claim['claim_id'] for claim in (
await self.conductor.spv_node.server.bp.db.search_index.search( await self.conductor.spv_node.server.session_manager.search_index.search(
channel_id=channel_id, fee_amount=">=0" channel_id=channel_id, fee_amount=">=0"
))[0] ))[0]
] ]
@ -969,7 +969,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
self.assertSetEqual(set(greater_than_or_equal_to_zero), {stream_with_no_fee, stream_with_fee}) self.assertSetEqual(set(greater_than_or_equal_to_zero), {stream_with_no_fee, stream_with_fee})
greater_than_zero = [ greater_than_zero = [
claim['claim_id'] for claim in ( claim['claim_id'] for claim in (
await self.conductor.spv_node.server.bp.db.search_index.search( await self.conductor.spv_node.server.session_manager.search_index.search(
channel_id=channel_id, fee_amount=">0" channel_id=channel_id, fee_amount=">0"
))[0] ))[0]
] ]
@ -977,7 +977,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
self.assertSetEqual(set(greater_than_zero), {stream_with_fee}) self.assertSetEqual(set(greater_than_zero), {stream_with_fee})
equal_to_zero = [ equal_to_zero = [
claim['claim_id'] for claim in ( claim['claim_id'] for claim in (
await self.conductor.spv_node.server.bp.db.search_index.search( await self.conductor.spv_node.server.session_manager.search_index.search(
channel_id=channel_id, fee_amount="<=0" channel_id=channel_id, fee_amount="<=0"
))[0] ))[0]
] ]
@ -995,7 +995,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.blockchain.send_to_address(address, 400.0) await self.blockchain.send_to_address(address, 400.0)
await self.account.ledger.on_address.first await self.account.ledger.on_address.first
await self.generate(100) await self.generate(100)
self.assertEqual(800, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(800, self.conductor.spv_node.server.db.db_height)
# Block 801: Claim A for 10 LBC is accepted. # Block 801: Claim A for 10 LBC is accepted.
# It is the first claim, so it immediately becomes active and controlling. # It is the first claim, so it immediately becomes active and controlling.
@ -1007,10 +1007,10 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# Its activation height is 1121 + min(4032, floor((1121-801) / 32)) = 1121 + 10 = 1131. # Its activation height is 1121 + min(4032, floor((1121-801) / 32)) = 1121 + 10 = 1131.
# State: A(10) is controlling, B(20) is accepted. # State: A(10) is controlling, B(20) is accepted.
await self.generate(32 * 10 - 1) await self.generate(32 * 10 - 1)
self.assertEqual(1120, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1120, self.conductor.spv_node.server.db.db_height)
claim_id_B = (await self.stream_create(name, '20.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] claim_id_B = (await self.stream_create(name, '20.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
claim_B, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_B}") claim_B, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_B}")
self.assertEqual(1121, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1121, self.conductor.spv_node.server.db.db_height)
self.assertEqual(1131, claim_B.activation_height) self.assertEqual(1131, claim_B.activation_height)
await self.assertMatchClaimIsWinning(name, claim_id_A) await self.assertMatchClaimIsWinning(name, claim_id_A)
@ -1018,33 +1018,33 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# Since it is a support for the controlling claim, it activates immediately. # Since it is a support for the controlling claim, it activates immediately.
# State: A(10+14) is controlling, B(20) is accepted. # State: A(10+14) is controlling, B(20) is accepted.
await self.support_create(claim_id_A, bid='14.0') await self.support_create(claim_id_A, bid='14.0')
self.assertEqual(1122, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1122, self.conductor.spv_node.server.db.db_height)
await self.assertMatchClaimIsWinning(name, claim_id_A) await self.assertMatchClaimIsWinning(name, claim_id_A)
# Block 1123: Claim C for 50 LBC is accepted. # Block 1123: Claim C for 50 LBC is accepted.
# The activation height is 1123 + min(4032, floor((1123-801) / 32)) = 1123 + 10 = 1133. # The activation height is 1123 + min(4032, floor((1123-801) / 32)) = 1123 + 10 = 1133.
# State: A(10+14) is controlling, B(20) is accepted, C(50) is accepted. # State: A(10+14) is controlling, B(20) is accepted, C(50) is accepted.
claim_id_C = (await self.stream_create(name, '50.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] claim_id_C = (await self.stream_create(name, '50.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
self.assertEqual(1123, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1123, self.conductor.spv_node.server.db.db_height)
claim_C, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_C}") claim_C, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_C}")
self.assertEqual(1133, claim_C.activation_height) self.assertEqual(1133, claim_C.activation_height)
await self.assertMatchClaimIsWinning(name, claim_id_A) await self.assertMatchClaimIsWinning(name, claim_id_A)
await self.generate(7) await self.generate(7)
self.assertEqual(1130, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1130, self.conductor.spv_node.server.db.db_height)
await self.assertMatchClaimIsWinning(name, claim_id_A) await self.assertMatchClaimIsWinning(name, claim_id_A)
await self.generate(1) await self.generate(1)
# Block 1131: Claim B activates. It has 20 LBC, while claim A has 24 LBC (10 original + 14 from support X). There is no takeover, and claim A remains controlling. # Block 1131: Claim B activates. It has 20 LBC, while claim A has 24 LBC (10 original + 14 from support X). There is no takeover, and claim A remains controlling.
# State: A(10+14) is controlling, B(20) is active, C(50) is accepted. # State: A(10+14) is controlling, B(20) is active, C(50) is accepted.
self.assertEqual(1131, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1131, self.conductor.spv_node.server.db.db_height)
await self.assertMatchClaimIsWinning(name, claim_id_A) await self.assertMatchClaimIsWinning(name, claim_id_A)
# Block 1132: Claim D for 300 LBC is accepted. The activation height is 1132 + min(4032, floor((1132-801) / 32)) = 1132 + 10 = 1142. # Block 1132: Claim D for 300 LBC is accepted. The activation height is 1132 + min(4032, floor((1132-801) / 32)) = 1132 + 10 = 1142.
# State: A(10+14) is controlling, B(20) is active, C(50) is accepted, D(300) is accepted. # State: A(10+14) is controlling, B(20) is active, C(50) is accepted, D(300) is accepted.
claim_id_D = (await self.stream_create(name, '300.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] claim_id_D = (await self.stream_create(name, '300.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
self.assertEqual(1132, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1132, self.conductor.spv_node.server.db.db_height)
claim_D, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_D}") claim_D, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_D}")
self.assertEqual(False, claim_D.is_controlling) self.assertEqual(False, claim_D.is_controlling)
self.assertEqual(801, claim_D.last_takeover_height) self.assertEqual(801, claim_D.last_takeover_height)
self.assertEqual(1142, claim_D.activation_height) self.assertEqual(1142, claim_D.activation_height)
@ -1053,8 +1053,8 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# Block 1133: Claim C activates. It has 50 LBC, while claim A has 24 LBC, so a takeover is initiated. The takeover height for this name is set to 1133, and therefore the activation delay for all the claims becomes min(4032, floor((1133-1133) / 32)) = 0. All the claims become active. The totals for each claim are recalculated, and claim D becomes controlling because it has the highest total. # Block 1133: Claim C activates. It has 50 LBC, while claim A has 24 LBC, so a takeover is initiated. The takeover height for this name is set to 1133, and therefore the activation delay for all the claims becomes min(4032, floor((1133-1133) / 32)) = 0. All the claims become active. The totals for each claim are recalculated, and claim D becomes controlling because it has the highest total.
# State: A(10+14) is active, B(20) is active, C(50) is active, D(300) is controlling # State: A(10+14) is active, B(20) is active, C(50) is active, D(300) is controlling
await self.generate(1) await self.generate(1)
self.assertEqual(1133, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1133, self.conductor.spv_node.server.db.db_height)
claim_D, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_D}") claim_D, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_D}")
self.assertEqual(True, claim_D.is_controlling) self.assertEqual(True, claim_D.is_controlling)
self.assertEqual(1133, claim_D.last_takeover_height) self.assertEqual(1133, claim_D.last_takeover_height)
self.assertEqual(1133, claim_D.activation_height) self.assertEqual(1133, claim_D.activation_height)
@ -1407,12 +1407,12 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id'] second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id']
await self.assertNoClaim(second_claim_id) await self.assertNoClaim(second_claim_id)
self.assertEqual( self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1 len((await self.conductor.spv_node.server.session_manager.search_index.search(claim_name=name))[0]), 1
) )
await self.generate(1) await self.generate(1)
await self.assertMatchClaim(second_claim_id) await self.assertMatchClaim(second_claim_id)
self.assertEqual( self.assertEqual(
len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2 len((await self.conductor.spv_node.server.session_manager.search_index.search(claim_name=name))[0]), 2
) )
async def test_abandon_controlling_same_block_as_new_claim(self): async def test_abandon_controlling_same_block_as_new_claim(self):
@ -1428,7 +1428,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
async def test_trending(self): async def test_trending(self):
async def get_trending_score(claim_id): async def get_trending_score(claim_id):
return (await self.conductor.spv_node.server.bp.db.search_index.search( return (await self.conductor.spv_node.server.session_manager.search_index.search(
claim_id=claim_id claim_id=claim_id
))[0][0]['trending_score'] ))[0][0]['trending_score']
@ -1456,7 +1456,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
) )
await self.generate(1) await self.generate(1)
self.assertEqual(-174.951347102643, await get_trending_score(claim_id1)) self.assertEqual(-174.951347102643, await get_trending_score(claim_id1))
search_results = (await self.conductor.spv_node.server.bp.db.search_index.search(claim_name="derp"))[0] search_results = (await self.conductor.spv_node.server.session_manager.search_index.search(claim_name="derp"))[0]
self.assertEqual(1, len(search_results)) self.assertEqual(1, len(search_results))
self.assertListEqual([claim_id1], [c['claim_id'] for c in search_results]) self.assertListEqual([claim_id1], [c['claim_id'] for c in search_results])
@ -1465,22 +1465,31 @@ class ResolveAfterReorg(BaseResolveTestCase):
async def reorg(self, start): async def reorg(self, start):
blocks = self.ledger.headers.height - start blocks = self.ledger.headers.height - start
self.blockchain.block_expected = start - 1 self.blockchain.block_expected = start - 1
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
self.conductor.spv_node.server.synchronized.clear()
# go back to start # go back to start
await self.blockchain.invalidate_block((await self.ledger.headers.hash(start)).decode()) print('invalidate block', await self.blockchain.invalidate_block((await self.ledger.headers.hash(start)).decode()))
# go to previous + 1 # go to previous + 1
await self.generate(blocks + 2) await self.blockchain.generate(blocks + 2)
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
await self.conductor.spv_node.server.synchronized.wait()
# await asyncio.wait_for(self.on_header(self.blockchain.block_expected), 30.0)
async def assertBlockHash(self, height): async def assertBlockHash(self, height):
bp = self.conductor.spv_node.server.bp reader_db = self.conductor.spv_node.server.db
block_hash = await self.blockchain.get_block_hash(height) block_hash = await self.blockchain.get_block_hash(height)
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) self.assertEqual(block_hash, (await reader_db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = [ txids = [
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height) tx_hash[::-1].hex() for tx_hash in reader_db.get_block_txs(height)
] ]
txs = await bp.db.get_transactions_and_merkles(txids) txs = await reader_db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] block_txs = (await self.conductor.spv_node.server.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
@ -1491,9 +1500,18 @@ class ResolveAfterReorg(BaseResolveTestCase):
channel_id = self.get_claim_id( channel_id = self.get_claim_id(
await self.channel_create(channel_name, '0.01') await self.channel_create(channel_name, '0.01')
) )
self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex())
await self.assertNameState(
height=207, name='@abc', winning_claim_id=channel_id, last_takeover_height=207,
non_winning_claims=[]
)
await self.reorg(206) await self.reorg(206)
self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex())
await self.assertNameState(
height=208, name='@abc', winning_claim_id=channel_id, last_takeover_height=207,
non_winning_claims=[]
)
# await self.assertNoClaimForName(channel_name) # await self.assertNoClaimForName(channel_name)
# self.assertNotIn('error', await self.resolve(channel_name)) # self.assertNotIn('error', await self.resolve(channel_name))
@ -1502,16 +1520,29 @@ class ResolveAfterReorg(BaseResolveTestCase):
stream_id = self.get_claim_id( stream_id = self.get_claim_id(
await self.stream_create(stream_name, '0.01', channel_id=channel_id) await self.stream_create(stream_name, '0.01', channel_id=channel_id)
) )
self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex())
await self.assertNameState(
height=209, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209,
non_winning_claims=[]
)
await self.reorg(206) await self.reorg(206)
self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex()) await self.assertNameState(
height=210, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209,
non_winning_claims=[]
)
await self.support_create(stream_id, '0.01') await self.support_create(stream_id, '0.01')
self.assertNotIn('error', await self.resolve(stream_name))
self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex()) await self.assertNameState(
height=211, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209,
non_winning_claims=[]
)
await self.reorg(206) await self.reorg(206)
# self.assertNotIn('error', await self.resolve(stream_name)) # self.assertNotIn('error', await self.resolve(stream_name))
self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex()) await self.assertNameState(
height=212, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209,
non_winning_claims=[]
)
await self.stream_abandon(stream_id) await self.stream_abandon(stream_id)
self.assertNotIn('error', await self.resolve(channel_name)) self.assertNotIn('error', await self.resolve(channel_name))
@ -1553,7 +1584,6 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.ledger.wait(broadcast_tx) await self.ledger.wait(broadcast_tx)
await self.support_create(still_valid.outputs[0].claim_id, '0.01') await self.support_create(still_valid.outputs[0].claim_id, '0.01')
# await self.generate(1)
await self.ledger.wait(broadcast_tx, self.blockchain.block_expected) await self.ledger.wait(broadcast_tx, self.blockchain.block_expected)
self.assertEqual(self.ledger.headers.height, 208) self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(208) await self.assertBlockHash(208)
@ -1574,7 +1604,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0) await asyncio.wait_for(self.on_header(209), 30.0)
await self.assertBlockHash(207) await self.assertBlockHash(207)
await self.assertBlockHash(208) await self.assertBlockHash(208)
await self.assertBlockHash(209) await self.assertBlockHash(209)
@ -1603,7 +1633,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(1) await self.blockchain.generate(1)
# wait for the client to catch up # wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0) await asyncio.wait_for(self.on_header(210), 30.0)
# verify the claim is in the new block and that it is returned by claim_search # verify the claim is in the new block and that it is returned by claim_search
republished = await self.resolve('hovercraft') republished = await self.resolve('hovercraft')
@ -1653,7 +1683,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0) await asyncio.wait_for(self.on_header(209), 30.0)
await self.assertBlockHash(207) await self.assertBlockHash(207)
await self.assertBlockHash(208) await self.assertBlockHash(208)
await self.assertBlockHash(209) await self.assertBlockHash(209)
@ -1682,7 +1712,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(1) await self.blockchain.generate(1)
# wait for the client to catch up # wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0) await asyncio.wait_for(self.on_header(210), 10.0)
# verify the claim is in the new block and that it is returned by claim_search # verify the claim is in the new block and that it is returned by claim_search
republished = await self.resolve('hovercraft') republished = await self.resolve('hovercraft')

View file

@ -23,7 +23,7 @@ class BasicTransactionTest(IntegrationTestCase):
)) ))
sendtxid1 = await self.blockchain.send_to_address(address1, 5) sendtxid1 = await self.blockchain.send_to_address(address1, 5)
sendtxid2 = await self.blockchain.send_to_address(address2, 5) sendtxid2 = await self.blockchain.send_to_address(address2, 5)
await self.blockchain.generate(1) await self.generate(1)
await notifications await notifications
self.assertEqual(d2l(await self.account.get_balance()), '10.0') self.assertEqual(d2l(await self.account.get_balance()), '10.0')
@ -57,7 +57,7 @@ class BasicTransactionTest(IntegrationTestCase):
notifications = asyncio.create_task(asyncio.wait( notifications = asyncio.create_task(asyncio.wait(
[asyncio.ensure_future(self.ledger.wait(channel_tx)), asyncio.ensure_future(self.ledger.wait(stream_tx))] [asyncio.ensure_future(self.ledger.wait(channel_tx)), asyncio.ensure_future(self.ledger.wait(stream_tx))]
)) ))
await self.blockchain.generate(1) await self.generate(1)
await notifications await notifications
self.assertEqual(d2l(await self.account.get_balance()), '7.985786') self.assertEqual(d2l(await self.account.get_balance()), '7.985786')
self.assertEqual(d2l(await self.account.get_balance(include_claims=True)), '9.985786') self.assertEqual(d2l(await self.account.get_balance(include_claims=True)), '9.985786')
@ -70,7 +70,7 @@ class BasicTransactionTest(IntegrationTestCase):
await self.broadcast(abandon_tx) await self.broadcast(abandon_tx)
await notify await notify
notify = asyncio.create_task(self.ledger.wait(abandon_tx)) notify = asyncio.create_task(self.ledger.wait(abandon_tx))
await self.blockchain.generate(1) await self.generate(1)
await notify await notify
response = await self.ledger.resolve([], ['lbry://@bar/foo']) response = await self.ledger.resolve([], ['lbry://@bar/foo'])

View file

@ -9,9 +9,8 @@ from lbry.wallet.manager import WalletManager
class BasicTransactionTests(IntegrationTestCase): class BasicTransactionTests(IntegrationTestCase):
async def test_variety_of_transactions_and_longish_history(self): async def test_variety_of_transactions_and_longish_history(self):
await self.blockchain.generate(300) await self.generate(300)
await self.assertBalance(self.account, '0.0') await self.assertBalance(self.account, '0.0')
addresses = await self.account.receiving.get_addresses() addresses = await self.account.receiving.get_addresses()
@ -57,7 +56,7 @@ class BasicTransactionTests(IntegrationTestCase):
for tx in await self.ledger.db.get_transactions(txid__in=[tx.id for tx in txs]) for tx in await self.ledger.db.get_transactions(txid__in=[tx.id for tx in txs])
])) ]))
await self.blockchain.generate(1) await self.generate(1)
await asyncio.wait([self.ledger.wait(tx) for tx in txs]) await asyncio.wait([self.ledger.wait(tx) for tx in txs])
await self.assertBalance(self.account, '199.99876') await self.assertBalance(self.account, '199.99876')
@ -74,7 +73,7 @@ class BasicTransactionTests(IntegrationTestCase):
) )
await self.broadcast(tx) await self.broadcast(tx)
await self.ledger.wait(tx) await self.ledger.wait(tx)
await self.blockchain.generate(1) await self.generate(1)
await self.ledger.wait(tx) await self.ledger.wait(tx)
self.assertEqual(2, await self.account.get_utxo_count()) # 199 + change self.assertEqual(2, await self.account.get_utxo_count()) # 199 + change
@ -92,7 +91,7 @@ class BasicTransactionTests(IntegrationTestCase):
self.blockchain.send_to_address(address, 1.1) for address in addresses[:5] self.blockchain.send_to_address(address, 1.1) for address in addresses[:5]
)) ))
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # mempool await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # mempool
await self.blockchain.generate(1) await self.generate(1)
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # confirmed await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # confirmed
await self.assertBalance(account1, '5.5') await self.assertBalance(account1, '5.5')
await self.assertBalance(account2, '0.0') await self.assertBalance(account2, '0.0')
@ -107,7 +106,7 @@ class BasicTransactionTests(IntegrationTestCase):
) )
await self.broadcast(tx) await self.broadcast(tx)
await self.ledger.wait(tx) # mempool await self.ledger.wait(tx) # mempool
await self.blockchain.generate(1) await self.generate(1)
await self.ledger.wait(tx) # confirmed await self.ledger.wait(tx) # confirmed
await self.assertBalance(account1, '3.499802') await self.assertBalance(account1, '3.499802')
@ -121,7 +120,7 @@ class BasicTransactionTests(IntegrationTestCase):
) )
await self.broadcast(tx) await self.broadcast(tx)
await self.ledger.wait(tx) # mempool await self.ledger.wait(tx) # mempool
await self.blockchain.generate(1) await self.generate(1)
await self.ledger.wait(tx) # confirmed await self.ledger.wait(tx) # confirmed
tx = (await account1.get_transactions(include_is_my_input=True, include_is_my_output=True))[1] tx = (await account1.get_transactions(include_is_my_input=True, include_is_my_output=True))[1]
@ -133,11 +132,11 @@ class BasicTransactionTests(IntegrationTestCase):
self.assertTrue(tx.outputs[1].is_internal_transfer) self.assertTrue(tx.outputs[1].is_internal_transfer)
async def test_history_edge_cases(self): async def test_history_edge_cases(self):
await self.blockchain.generate(300) await self.generate(300)
await self.assertBalance(self.account, '0.0') await self.assertBalance(self.account, '0.0')
address = await self.account.receiving.get_or_create_usable_address() address = await self.account.receiving.get_or_create_usable_address()
# evil trick: mempool is unsorted on real life, but same order between python instances. reproduce it # evil trick: mempool is unsorted on real life, but same order between python instances. reproduce it
original_summary = self.conductor.spv_node.server.bp.mempool.transaction_summaries original_summary = self.conductor.spv_node.server.mempool.transaction_summaries
def random_summary(*args, **kwargs): def random_summary(*args, **kwargs):
summary = original_summary(*args, **kwargs) summary = original_summary(*args, **kwargs)
@ -146,7 +145,7 @@ class BasicTransactionTests(IntegrationTestCase):
while summary == ordered: while summary == ordered:
random.shuffle(summary) random.shuffle(summary)
return summary return summary
self.conductor.spv_node.server.bp.mempool.transaction_summaries = random_summary self.conductor.spv_node.server.mempool.transaction_summaries = random_summary
# 10 unconfirmed txs, all from blockchain wallet # 10 unconfirmed txs, all from blockchain wallet
sends = [self.blockchain.send_to_address(address, 10) for _ in range(10)] sends = [self.blockchain.send_to_address(address, 10) for _ in range(10)]
# use batching to reduce issues with send_to_address on cli # use batching to reduce issues with send_to_address on cli
@ -199,7 +198,7 @@ class BasicTransactionTests(IntegrationTestCase):
async def test_sqlite_coin_chooser(self): async def test_sqlite_coin_chooser(self):
wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger}) wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger})
await self.blockchain.generate(300) await self.generate(300)
await self.assertBalance(self.account, '0.0') await self.assertBalance(self.account, '0.0')
address = await self.account.receiving.get_or_create_usable_address() address = await self.account.receiving.get_or_create_usable_address()

View file

@ -126,28 +126,28 @@ class TestRevertablePrefixDB(unittest.TestCase):
self.assertIsNone(self.db.claim_takeover.get(name)) self.assertIsNone(self.db.claim_takeover.get(name))
self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height) self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height)
self.db.commit(10000000) self.db.commit(10000000, b'\x00' * 32)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height) self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
self.db.claim_takeover.stage_delete((name,), (claim_hash1, takeover_height)) self.db.claim_takeover.stage_delete((name,), (claim_hash1, takeover_height))
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 1)) self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 1))
self.db.claim_takeover.stage_delete((name,), (claim_hash2, takeover_height + 1)) self.db.claim_takeover.stage_delete((name,), (claim_hash2, takeover_height + 1))
self.db.commit(10000001) self.db.commit(10000001, b'\x01' * 32)
self.assertIsNone(self.db.claim_takeover.get(name)) self.assertIsNone(self.db.claim_takeover.get(name))
self.db.claim_takeover.stage_put((name,), (claim_hash3, takeover_height + 2)) self.db.claim_takeover.stage_put((name,), (claim_hash3, takeover_height + 2))
self.db.commit(10000002) self.db.commit(10000002, b'\x02' * 32)
self.assertEqual(10000002, self.db.claim_takeover.get(name).height) self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
self.db.claim_takeover.stage_delete((name,), (claim_hash3, takeover_height + 2)) self.db.claim_takeover.stage_delete((name,), (claim_hash3, takeover_height + 2))
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 3)) self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 3))
self.db.commit(10000003) self.db.commit(10000003, b'\x03' * 32)
self.assertEqual(10000003, self.db.claim_takeover.get(name).height) self.assertEqual(10000003, self.db.claim_takeover.get(name).height)
self.db.rollback(10000003) self.db.rollback(10000003, b'\x03' * 32)
self.assertEqual(10000002, self.db.claim_takeover.get(name).height) self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
self.db.rollback(10000002) self.db.rollback(10000002, b'\x02' * 32)
self.assertIsNone(self.db.claim_takeover.get(name)) self.assertIsNone(self.db.claim_takeover.get(name))
self.db.rollback(10000001) self.db.rollback(10000001, b'\x01' * 32)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height) self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
self.db.rollback(10000000) self.db.rollback(10000000, b'\x00' * 32)
self.assertIsNone(self.db.claim_takeover.get(name)) self.assertIsNone(self.db.claim_takeover.get(name))