diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 70245bb14..bc48be52a 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -807,8 +807,8 @@ class LevelDB: return self.prefix_db = HubDB( - os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.reorg_limit, self.env.cache_MB, - max_open_files=512 + os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB, + reorg_limit=self.env.reorg_limit, max_open_files=512 ) self.logger.info(f'opened db: lbry-leveldb') diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 5e3e94662..3983756be 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -636,12 +636,14 @@ class SessionManager: None, touched.intersection_update, self.hashx_subscriptions_by_session.keys() ) - if touched or (height_changed and self.mempool_statuses): + if touched or new_touched or (height_changed and self.mempool_statuses): notified_hashxs = 0 session_hashxes_to_notify = defaultdict(list) to_notify = touched if height_changed else new_touched for hashX in to_notify: + if hashX not in self.hashx_subscriptions_by_session: + continue for session_id in self.hashx_subscriptions_by_session[hashX]: session_hashxes_to_notify[session_id].append(hashX) notified_hashxs += 1 diff --git a/tests/integration/transactions/test_internal_transaction_api.py b/tests/integration/transactions/test_internal_transaction_api.py index 6eba5e229..7f0f0c161 100644 --- a/tests/integration/transactions/test_internal_transaction_api.py +++ b/tests/integration/transactions/test_internal_transaction_api.py @@ -17,13 +17,14 @@ class BasicTransactionTest(IntegrationTestCase): await self.account.ensure_address_gap() address1, address2 = await self.account.receiving.get_addresses(limit=2, only_usable=True) + notifications = asyncio.create_task(asyncio.wait( + [asyncio.ensure_future(self.on_address_update(address1)), + asyncio.ensure_future(self.on_address_update(address2))] + )) sendtxid1 = await self.blockchain.send_to_address(address1, 5) sendtxid2 = await self.blockchain.send_to_address(address2, 5) await self.blockchain.generate(1) - await asyncio.wait([ - self.on_transaction_id(sendtxid1), - self.on_transaction_id(sendtxid2) - ]) + await notifications self.assertEqual(d2l(await self.account.get_balance()), '10.0') @@ -44,18 +45,18 @@ class BasicTransactionTest(IntegrationTestCase): stream_txo.sign(channel_txo) await stream_tx.sign([self.account]) + notifications = asyncio.create_task(asyncio.wait( + [asyncio.ensure_future(self.ledger.wait(channel_tx)), asyncio.ensure_future(self.ledger.wait(stream_tx))] + )) + await self.broadcast(channel_tx) await self.broadcast(stream_tx) - await asyncio.wait([ # mempool - self.ledger.wait(channel_tx), - self.ledger.wait(stream_tx) - ]) + await notifications + notifications = asyncio.create_task(asyncio.wait( + [asyncio.ensure_future(self.ledger.wait(channel_tx)), asyncio.ensure_future(self.ledger.wait(stream_tx))] + )) await self.blockchain.generate(1) - await asyncio.wait([ # confirmed - self.ledger.wait(channel_tx), - self.ledger.wait(stream_tx) - ]) - + await notifications self.assertEqual(d2l(await self.account.get_balance()), '7.985786') self.assertEqual(d2l(await self.account.get_balance(include_claims=True)), '9.985786') @@ -63,10 +64,12 @@ class BasicTransactionTest(IntegrationTestCase): self.assertEqual(response['lbry://@bar/foo'].claim.claim_type, 'stream') abandon_tx = await Transaction.create([Input.spend(stream_tx.outputs[0])], [], [self.account], self.account) + notify = asyncio.create_task(self.ledger.wait(abandon_tx)) await self.broadcast(abandon_tx) - await self.ledger.wait(abandon_tx) + await notify + notify = asyncio.create_task(self.ledger.wait(abandon_tx)) await self.blockchain.generate(1) - await self.ledger.wait(abandon_tx) + await notify response = await self.ledger.resolve([], ['lbry://@bar/foo']) self.assertIn('error', response['lbry://@bar/foo']) diff --git a/tests/integration/transactions/test_transactions.py b/tests/integration/transactions/test_transactions.py index f8d679fee..fea0b18fb 100644 --- a/tests/integration/transactions/test_transactions.py +++ b/tests/integration/transactions/test_transactions.py @@ -18,10 +18,12 @@ class BasicTransactionTests(IntegrationTestCase): # send 10 coins to first 10 receiving addresses and then 10 transactions worth 10 coins each # to the 10th receiving address for a total of 30 UTXOs on the entire account for i in range(10): + notification = asyncio.ensure_future(self.on_address_update(addresses[i])) txid = await self.blockchain.send_to_address(addresses[i], 10) - await self.wait_for_txid(addresses[i]) + await notification + notification = asyncio.ensure_future(self.on_address_update(addresses[9])) txid = await self.blockchain.send_to_address(addresses[9], 10) - await self.wait_for_txid(addresses[9]) + await notification # use batching to reduce issues with send_to_address on cli await self.assertBalance(self.account, '200.0') @@ -174,11 +176,6 @@ class BasicTransactionTests(IntegrationTestCase): self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1])) self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync)) - def wait_for_txid(self, address): - return asyncio.ensure_future(self.ledger.on_transaction.where( - lambda e: e.address == address - )) - async def _test_transaction(self, send_amount, address, inputs, change): tx = await Transaction.create( [], [Output.pay_pubkey_hash(send_amount, self.ledger.address_to_hash160(address))], [self.account], @@ -203,30 +200,31 @@ class BasicTransactionTests(IntegrationTestCase): async def test_sqlite_coin_chooser(self): wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger}) await self.blockchain.generate(300) + await self.assertBalance(self.account, '0.0') address = await self.account.receiving.get_or_create_usable_address() other_account = self.wallet.generate_account(self.ledger) other_address = await other_account.receiving.get_or_create_usable_address() self.ledger.coin_selection_strategy = 'sqlite' await self.ledger.subscribe_account(self.account) - accepted = self.wait_for_txid(address) + accepted = asyncio.ensure_future(self.on_address_update(address)) txid = await self.blockchain.send_to_address(address, 1.0) await accepted - accepted = self.wait_for_txid(address) + accepted = asyncio.ensure_future(self.on_address_update(address)) txid = await self.blockchain.send_to_address(address, 1.0) await accepted - accepted = self.wait_for_txid(address) + accepted = asyncio.ensure_future(self.on_address_update(address)) txid = await self.blockchain.send_to_address(address, 3.0) await accepted - accepted = self.wait_for_txid(address) + accepted = asyncio.ensure_future(self.on_address_update(address)) txid = await self.blockchain.send_to_address(address, 5.0) await accepted - accepted = self.wait_for_txid(address) + accepted = asyncio.ensure_future(self.on_address_update(address)) txid = await self.blockchain.send_to_address(address, 10.0) await accepted