This commit is contained in:
Jack Robison 2021-10-06 13:21:32 -04:00 committed by Victor Shyba
parent 1ddd29656e
commit 8031a55dbb
4 changed files with 33 additions and 30 deletions

View file

@ -807,8 +807,8 @@ class LevelDB:
return return
self.prefix_db = HubDB( self.prefix_db = HubDB(
os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.reorg_limit, self.env.cache_MB, os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB,
max_open_files=512 reorg_limit=self.env.reorg_limit, max_open_files=512
) )
self.logger.info(f'opened db: lbry-leveldb') self.logger.info(f'opened db: lbry-leveldb')

View file

@ -636,12 +636,14 @@ class SessionManager:
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys() None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
) )
if touched or (height_changed and self.mempool_statuses): if touched or new_touched or (height_changed and self.mempool_statuses):
notified_hashxs = 0 notified_hashxs = 0
session_hashxes_to_notify = defaultdict(list) session_hashxes_to_notify = defaultdict(list)
to_notify = touched if height_changed else new_touched to_notify = touched if height_changed else new_touched
for hashX in to_notify: for hashX in to_notify:
if hashX not in self.hashx_subscriptions_by_session:
continue
for session_id in self.hashx_subscriptions_by_session[hashX]: for session_id in self.hashx_subscriptions_by_session[hashX]:
session_hashxes_to_notify[session_id].append(hashX) session_hashxes_to_notify[session_id].append(hashX)
notified_hashxs += 1 notified_hashxs += 1

View file

@ -17,13 +17,14 @@ class BasicTransactionTest(IntegrationTestCase):
await self.account.ensure_address_gap() await self.account.ensure_address_gap()
address1, address2 = await self.account.receiving.get_addresses(limit=2, only_usable=True) address1, address2 = await self.account.receiving.get_addresses(limit=2, only_usable=True)
notifications = asyncio.create_task(asyncio.wait(
[asyncio.ensure_future(self.on_address_update(address1)),
asyncio.ensure_future(self.on_address_update(address2))]
))
sendtxid1 = await self.blockchain.send_to_address(address1, 5) sendtxid1 = await self.blockchain.send_to_address(address1, 5)
sendtxid2 = await self.blockchain.send_to_address(address2, 5) sendtxid2 = await self.blockchain.send_to_address(address2, 5)
await self.blockchain.generate(1) await self.blockchain.generate(1)
await asyncio.wait([ await notifications
self.on_transaction_id(sendtxid1),
self.on_transaction_id(sendtxid2)
])
self.assertEqual(d2l(await self.account.get_balance()), '10.0') self.assertEqual(d2l(await self.account.get_balance()), '10.0')
@ -44,18 +45,18 @@ class BasicTransactionTest(IntegrationTestCase):
stream_txo.sign(channel_txo) stream_txo.sign(channel_txo)
await stream_tx.sign([self.account]) await stream_tx.sign([self.account])
notifications = asyncio.create_task(asyncio.wait(
[asyncio.ensure_future(self.ledger.wait(channel_tx)), asyncio.ensure_future(self.ledger.wait(stream_tx))]
))
await self.broadcast(channel_tx) await self.broadcast(channel_tx)
await self.broadcast(stream_tx) await self.broadcast(stream_tx)
await asyncio.wait([ # mempool await notifications
self.ledger.wait(channel_tx), notifications = asyncio.create_task(asyncio.wait(
self.ledger.wait(stream_tx) [asyncio.ensure_future(self.ledger.wait(channel_tx)), asyncio.ensure_future(self.ledger.wait(stream_tx))]
]) ))
await self.blockchain.generate(1) await self.blockchain.generate(1)
await asyncio.wait([ # confirmed await notifications
self.ledger.wait(channel_tx),
self.ledger.wait(stream_tx)
])
self.assertEqual(d2l(await self.account.get_balance()), '7.985786') self.assertEqual(d2l(await self.account.get_balance()), '7.985786')
self.assertEqual(d2l(await self.account.get_balance(include_claims=True)), '9.985786') self.assertEqual(d2l(await self.account.get_balance(include_claims=True)), '9.985786')
@ -63,10 +64,12 @@ class BasicTransactionTest(IntegrationTestCase):
self.assertEqual(response['lbry://@bar/foo'].claim.claim_type, 'stream') self.assertEqual(response['lbry://@bar/foo'].claim.claim_type, 'stream')
abandon_tx = await Transaction.create([Input.spend(stream_tx.outputs[0])], [], [self.account], self.account) abandon_tx = await Transaction.create([Input.spend(stream_tx.outputs[0])], [], [self.account], self.account)
notify = asyncio.create_task(self.ledger.wait(abandon_tx))
await self.broadcast(abandon_tx) await self.broadcast(abandon_tx)
await self.ledger.wait(abandon_tx) await notify
notify = asyncio.create_task(self.ledger.wait(abandon_tx))
await self.blockchain.generate(1) await self.blockchain.generate(1)
await self.ledger.wait(abandon_tx) await notify
response = await self.ledger.resolve([], ['lbry://@bar/foo']) response = await self.ledger.resolve([], ['lbry://@bar/foo'])
self.assertIn('error', response['lbry://@bar/foo']) self.assertIn('error', response['lbry://@bar/foo'])

View file

@ -18,10 +18,12 @@ class BasicTransactionTests(IntegrationTestCase):
# send 10 coins to first 10 receiving addresses and then 10 transactions worth 10 coins each # send 10 coins to first 10 receiving addresses and then 10 transactions worth 10 coins each
# to the 10th receiving address for a total of 30 UTXOs on the entire account # to the 10th receiving address for a total of 30 UTXOs on the entire account
for i in range(10): for i in range(10):
notification = asyncio.ensure_future(self.on_address_update(addresses[i]))
txid = await self.blockchain.send_to_address(addresses[i], 10) txid = await self.blockchain.send_to_address(addresses[i], 10)
await self.wait_for_txid(addresses[i]) await notification
notification = asyncio.ensure_future(self.on_address_update(addresses[9]))
txid = await self.blockchain.send_to_address(addresses[9], 10) txid = await self.blockchain.send_to_address(addresses[9], 10)
await self.wait_for_txid(addresses[9]) await notification
# use batching to reduce issues with send_to_address on cli # use batching to reduce issues with send_to_address on cli
await self.assertBalance(self.account, '200.0') await self.assertBalance(self.account, '200.0')
@ -174,11 +176,6 @@ class BasicTransactionTests(IntegrationTestCase):
self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1])) self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1]))
self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync)) self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync))
def wait_for_txid(self, address):
return asyncio.ensure_future(self.ledger.on_transaction.where(
lambda e: e.address == address
))
async def _test_transaction(self, send_amount, address, inputs, change): async def _test_transaction(self, send_amount, address, inputs, change):
tx = await Transaction.create( tx = await Transaction.create(
[], [Output.pay_pubkey_hash(send_amount, self.ledger.address_to_hash160(address))], [self.account], [], [Output.pay_pubkey_hash(send_amount, self.ledger.address_to_hash160(address))], [self.account],
@ -203,30 +200,31 @@ class BasicTransactionTests(IntegrationTestCase):
async def test_sqlite_coin_chooser(self): async def test_sqlite_coin_chooser(self):
wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger}) wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger})
await self.blockchain.generate(300) await self.blockchain.generate(300)
await self.assertBalance(self.account, '0.0') await self.assertBalance(self.account, '0.0')
address = await self.account.receiving.get_or_create_usable_address() address = await self.account.receiving.get_or_create_usable_address()
other_account = self.wallet.generate_account(self.ledger) other_account = self.wallet.generate_account(self.ledger)
other_address = await other_account.receiving.get_or_create_usable_address() other_address = await other_account.receiving.get_or_create_usable_address()
self.ledger.coin_selection_strategy = 'sqlite' self.ledger.coin_selection_strategy = 'sqlite'
await self.ledger.subscribe_account(self.account) await self.ledger.subscribe_account(self.account)
accepted = self.wait_for_txid(address)
accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 1.0) txid = await self.blockchain.send_to_address(address, 1.0)
await accepted await accepted
accepted = self.wait_for_txid(address) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 1.0) txid = await self.blockchain.send_to_address(address, 1.0)
await accepted await accepted
accepted = self.wait_for_txid(address) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 3.0) txid = await self.blockchain.send_to_address(address, 3.0)
await accepted await accepted
accepted = self.wait_for_txid(address) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 5.0) txid = await self.blockchain.send_to_address(address, 5.0)
await accepted await accepted
accepted = self.wait_for_txid(address) accepted = asyncio.ensure_future(self.on_address_update(address))
txid = await self.blockchain.send_to_address(address, 10.0) txid = await self.blockchain.send_to_address(address, 10.0)
await accepted await accepted