Merge pull request #2685 from lbryio/fast_broadcast_notify

Update balance after a broadcast transaction is accepted to mempool without a delay, fix bug in `status` header progress
This commit is contained in:
Jack Robison 2019-12-19 22:07:16 -05:00 committed by GitHub
commit 9e21c52d04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 49 additions and 29 deletions

View file

@ -128,14 +128,13 @@ class WalletComponent(Component):
if self.wallet_manager.ledger.network.remote_height:
local_height = self.wallet_manager.ledger.local_height_including_downloaded_height
disk_height = len(self.wallet_manager.ledger.headers)
download_height = local_height - disk_height if disk_height != local_height else local_height
remote_height = self.wallet_manager.ledger.network.remote_height
target_height = remote_height - disk_height if disk_height != local_height else remote_height
best_hash = self.wallet_manager.get_best_blockhash()
if not target_height:
progress = 100
if disk_height != local_height != remote_height:
download_height, target_height = local_height - disk_height, remote_height - disk_height
else:
progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100)
download_height, target_height = local_height, remote_height
progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100)
best_hash = self.wallet_manager.get_best_blockhash()
result.update({
'headers_synchronization_progress': progress,
'blocks': max(local_height, 0),

View file

@ -90,6 +90,7 @@ class CommandTestCase(IntegrationTestCase):
self.daemons = []
self.extra_wallet_nodes = []
self.extra_wallet_node_port = 5280
self.__height = 0
self.daemon = await self.add_daemon(self.wallet_node)
await self.account.ensure_address_gap()
@ -196,6 +197,7 @@ class CommandTestCase(IntegrationTestCase):
async def generate(self, blocks):
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
await self.blockchain.generate(blocks)
self.__height += 1
await self.ledger.on_header.where(self.blockchain.is_expected_block)
async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True):
@ -223,7 +225,7 @@ class CommandTestCase(IntegrationTestCase):
if confirm:
await self.ledger.wait(tx)
await self.generate(1)
await self.ledger.wait(tx)
await self.ledger.wait(tx, self.__height)
return self.sout(tx)
def create_upload_file(self, data, prefix=None, suffix=None):

View file

@ -226,7 +226,7 @@ class LbryWalletManager(BaseWalletManager):
try:
await self.ledger.broadcast(tx)
if blocking:
await self.ledger.wait(tx)
await self.ledger.wait(tx, timeout=None)
except:
await self.ledger.release_tx(tx)
raise

View file

@ -36,16 +36,17 @@ class PurchaseCommandTests(CommandTestCase):
await self.ledger.wait(purchase)
return claim_id
async def assertStreamPurchased(self, stream: Transaction, purchase: Transaction):
stream_txo, purchase_txo = stream.outputs[0], purchase.outputs[0]
stream_fee = stream_txo.claim.stream.fee
self.assertEqual(stream_fee.dewies, purchase_txo.amount)
self.assertEqual(stream_fee.address, purchase_txo.get_address(self.ledger))
async def assertStreamPurchased(self, stream: Transaction, operation):
await self.account.release_all_outputs()
buyer_balance = await self.account.get_balance()
merchant_balance = lbc_to_dewies(str(await self.blockchain.get_balance()))
pre_purchase_count = (await self.daemon.jsonrpc_purchase_list())['total_items']
purchase = await operation()
stream_txo, purchase_txo = stream.outputs[0], purchase.outputs[0]
stream_fee = stream_txo.claim.stream.fee
self.assertEqual(stream_fee.dewies, purchase_txo.amount)
self.assertEqual(stream_fee.address, purchase_txo.get_address(self.ledger))
await self.ledger.wait(purchase)
await self.generate(1)
@ -76,8 +77,7 @@ class PurchaseCommandTests(CommandTestCase):
claim_id = stream.outputs[0].claim_id
# explicit purchase of claim
tx = await self.daemon.jsonrpc_purchase_create(claim_id)
await self.assertStreamPurchased(stream, tx)
await self.assertStreamPurchased(stream, lambda: self.daemon.jsonrpc_purchase_create(claim_id))
# check that `get` doesn't purchase it again
balance = await self.account.get_balance()
@ -88,8 +88,12 @@ class PurchaseCommandTests(CommandTestCase):
# `get` does purchase a stream we don't have yet
another_stream = await self.priced_stream('another')
response = await self.daemon.jsonrpc_get('lbry://another')
await self.assertStreamPurchased(another_stream, response.content_fee)
async def imagine_its_a_lambda():
response = await self.daemon.jsonrpc_get('lbry://another')
return response.content_fee
await self.assertStreamPurchased(another_stream, imagine_its_a_lambda)
# purchase non-existent claim fails
with self.assertRaisesRegex(Exception, "Could not find claim with claim_id"):
@ -105,13 +109,13 @@ class PurchaseCommandTests(CommandTestCase):
await self.daemon.jsonrpc_purchase_create(claim_id)
# force purchasing claim you already own
tx = await self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True)
await self.assertStreamPurchased(stream, tx)
await self.assertStreamPurchased(
stream, lambda: self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True)
)
# purchase by uri
abc_stream = await self.priced_stream('abc')
tx = await self.daemon.jsonrpc_purchase_create(url='lbry://abc')
await self.assertStreamPurchased(abc_stream, tx)
await self.assertStreamPurchased(abc_stream, lambda: self.daemon.jsonrpc_purchase_create(url='lbry://abc'))
async def test_purchase_and_transaction_list(self):
self.assertItemCount(await self.daemon.jsonrpc_purchase_list(), 0)

View file

@ -575,7 +575,7 @@ class BaseLedger(metaclass=LedgerRegistry):
# broadcast can't be a retriable call yet
return self.network.broadcast(hexlify(tx.raw).decode())
async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=None):
async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=1):
addresses = set()
for txi in tx.inputs:
if txi.txo_ref.txo is not None:
@ -593,4 +593,12 @@ class BaseLedger(metaclass=LedgerRegistry):
)) for address_record in records
], timeout=timeout)
if pending:
raise asyncio.TimeoutError('Timed out waiting for transaction.')
for record in records:
found = False
_, local_history = await self.get_local_status_and_history(None, history=record['history'])
for txid, local_height in local_history:
if txid == tx.id and local_height >= height:
found = True
if not found:
print(record['history'], addresses, tx.id)
raise asyncio.TimeoutError('Timed out waiting for transaction.')

View file

@ -232,7 +232,7 @@ class BaseNetwork:
def get_transaction(self, tx_hash, known_height=None):
# use any server if its old, otherwise restrict to who gave us the history
restricted = not known_height or 0 > known_height > self.remote_height - 10
restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10
return self.rpc('blockchain.transaction.get', [tx_hash], restricted)
def get_transaction_height(self, tx_hash, known_height=None):

View file

@ -11,7 +11,6 @@ import asyncio
import itertools
import time
from abc import ABC, abstractmethod
from asyncio import Lock, sleep
from collections import defaultdict
import attr
@ -104,7 +103,8 @@ class MemPool:
self.refresh_secs = refresh_secs
self.log_status_secs = log_status_secs
# Prevents mempool refreshes during fee histogram calculation
self.lock = Lock()
self.lock = asyncio.Lock()
self.wakeup = asyncio.Event()
async def _logging(self, synchronized_event):
"""Print regular logs of mempool stats."""
@ -117,7 +117,7 @@ class MemPool:
while True:
self.logger.info(f'{len(self.txs):,d} txs '
f'touching {len(self.hashXs):,d} addresses')
await sleep(self.log_status_secs)
await asyncio.sleep(self.log_status_secs)
await synchronized_event.wait()
async def _refresh_histogram(self, synchronized_event):
@ -126,7 +126,7 @@ class MemPool:
async with self.lock:
# Threaded as can be expensive
await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000)
await sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
def _update_histogram(self, bin_size):
# Build a histogram by fee rate
@ -212,7 +212,13 @@ class MemPool:
synchronized_event.set()
synchronized_event.clear()
await self.api.on_mempool(touched, height)
await sleep(self.refresh_secs)
try:
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
except asyncio.TimeoutError:
pass
finally:
self.wakeup.clear()
async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes

View file

@ -567,6 +567,7 @@ class SessionManager:
async def broadcast_transaction(self, raw_tx):
hex_hash = await self.daemon.broadcast_transaction(raw_tx)
self.mempool.wakeup.set()
self.txs_sent += 1
return hex_hash