From 37d46ecdb2695e7d04a637a79af769192a8b5417 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 9 Jan 2020 23:02:16 -0500 Subject: [PATCH 1/5] fix looping over same things in _transaction_io --- lbry/wallet/database.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index ab629bdac..eec048a2d 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -416,7 +416,7 @@ class Database(SQLiteMixin): 'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified }, 'txid = ?', (tx.id,))) - def _transaction_io(self, conn: sqlite3.Connection, tx: Transaction, address, txhash, history): + def _transaction_io(self, conn: sqlite3.Connection, tx: Transaction, address, txhash): conn.execute(*self._insert_sql('tx', self.tx_to_row(tx), replace=True)) for txo in tx.outputs: @@ -438,18 +438,20 @@ class Database(SQLiteMixin): 'address': address, }, ignore_duplicate=True)).fetchall() - conn.execute( - "UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?", - (history, history.count(':') // 2, address) - ) - def save_transaction_io(self, tx: Transaction, address, txhash, history): - return self.db.run(self._transaction_io, tx, address, txhash, history) + return self.save_transaction_io_batch([tx], address, txhash, history) def save_transaction_io_batch(self, txs: Iterable[Transaction], address, txhash, history): + history_count = history.count(':') // 2 + def __many(conn): for tx in txs: - self._transaction_io(conn, tx, address, txhash, history) + self._transaction_io(conn, tx, address, txhash) + conn.execute( + "UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?", + (history, history_count, address) + ).fetchall() + return self.db.run(__many) async def reserve_outputs(self, txos, is_reserved=True): From 0ee7870bdf79c4c9b1f2c8b17b7a8c2b9921f878 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 9 Jan 2020 23:02:58 -0500 Subject: [PATCH 2/5] defaultdict --- lbry/wallet/ledger.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 7e9685ee6..314903eea 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -7,9 +7,9 @@ from io import StringIO from datetime import datetime from functools import partial from operator import itemgetter -from collections import namedtuple +from collections import namedtuple, defaultdict from binascii import hexlify, unhexlify -from typing import Dict, Tuple, Type, Iterable, List, Optional +from typing import Dict, Tuple, Type, Iterable, List, Optional, DefaultDict import pylru from lbry.schema.result import Outputs @@ -154,7 +154,7 @@ class Ledger(metaclass=LedgerRegistry): self._update_tasks = TaskGroup() self._utxo_reservation_lock = asyncio.Lock() self._header_processing_lock = asyncio.Lock() - self._address_update_locks: Dict[str, asyncio.Lock] = {} + self._address_update_locks: DefaultDict[str, asyncio.Lock] = defaultdict(asyncio.Lock) self.coin_selection_strategy = None self._known_addresses_out_of_sync = set() @@ -458,10 +458,8 @@ class Ledger(metaclass=LedgerRegistry): address, remote_status = update self._update_tasks.add(self.update_history(address, remote_status)) - async def update_history(self, address, remote_status, - address_manager: AddressManager = None): - - async with self._address_update_locks.setdefault(address, asyncio.Lock()): + async def update_history(self, address, remote_status, address_manager: AddressManager = None): + async with self._address_update_locks[address]: self._known_addresses_out_of_sync.discard(address) local_status, local_history = await self.get_local_status_and_history(address) From 38b108752e49661222573accfba3d1328fc4e182 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 9 Jan 2020 23:05:49 -0500 Subject: [PATCH 3/5] batched blockchain.address.subscribe --- lbry/wallet/ledger.py | 34 ++++++++++++++++++++++++++-------- lbry/wallet/network.py | 5 +++-- lbry/wallet/server/session.py | 10 +++++++--- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 314903eea..a3fbdf257 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -425,6 +425,7 @@ class Ledger(metaclass=LedgerRegistry): async def subscribe_accounts(self): if self.network.is_connected and self.accounts: + log.info("Subscribe to %i accounts", len(self.accounts)) await asyncio.wait([ self.subscribe_account(a) for a in self.accounts ]) @@ -444,15 +445,32 @@ class Ledger(metaclass=LedgerRegistry): AddressesGeneratedEvent(address_manager, addresses) ) - async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str]): + async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str], batch_size: int = 1000): if self.network.is_connected and addresses: - await asyncio.wait([ - self.subscribe_address(address_manager, address) for address in addresses - ]) - - async def subscribe_address(self, address_manager: AddressManager, address: str): - remote_status = await self.network.subscribe_address(address) - self._update_tasks.add(self.update_history(address, remote_status, address_manager)) + addresses_remaining = list(addresses) + retries = 0 + while addresses_remaining: + batch = addresses_remaining[:batch_size] + try: + results = await self.network.rpc('blockchain.address.subscribe', batch, True) + for address, remote_status in zip(batch, results): + self._update_tasks.add(self.update_history(address, remote_status, address_manager)) + retries = 0 + addresses_remaining = addresses_remaining[batch_size:] + log.info("subscribed to %i/%i addresses", len(addresses) - len(addresses_remaining), len(addresses)) + except asyncio.TimeoutError: + if retries >= 3: + log.warning( + "timed out subscribing to addresses from %s:%i", + *self.network.client.server_address_and_port + ) + # abort and cancel, we can't lose a subscription, it will happen again on reconnect + if self.network.client: + self.network.client.abort() + raise asyncio.CancelledError() + await asyncio.sleep(1) + retries += 1 + log.info("finished subscribing to %i addresses", len(addresses)) def process_status_update(self, update): address, remote_status = update diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 542408d87..b3ae4b1b7 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -256,9 +256,10 @@ class Network: def subscribe_headers(self): return self.rpc('blockchain.headers.subscribe', [True], True) - async def subscribe_address(self, address): + async def subscribe_address(self, address, *addresses): + addresses = list((address, ) + addresses) try: - return await self.rpc('blockchain.address.subscribe', [address], True) + return await self.rpc('blockchain.address.subscribe', addresses, True) except asyncio.TimeoutError: # abort and cancel, we can't lose a subscription, it will happen again on reconnect if self.client: diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 9cca31d2b..ee4686284 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1141,12 +1141,16 @@ class LBRYElectrumX(SessionBase): hashX = self.address_to_hashX(address) return await self.hashX_listunspent(hashX) - async def address_subscribe(self, address): + async def address_subscribe(self, *addresses): """Subscribe to an address. address: the address to subscribe to""" - hashX = self.address_to_hashX(address) - return await self.hashX_subscribe(hashX, address) + if len(addresses) > 1000: + raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') + hashXes = [ + (self.address_to_hashX(address), address) for address in addresses + ] + return await asyncio.gather(*(self.hashX_subscribe(*args) for args in hashXes)) async def address_unsubscribe(self, address): """Unsubscribe an address. From 08f652055751e6826f4b91fbde4c86f01b670715 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 10 Jan 2020 12:27:56 -0500 Subject: [PATCH 4/5] bare excepts --- lbry/dht/blob_announcer.py | 2 +- lbry/extras/daemon/components.py | 2 +- lbry/stream/stream_manager.py | 2 +- lbry/wallet/ledger.py | 12 +++++++++--- tests/unit/stream/test_stream_manager.py | 2 +- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/lbry/dht/blob_announcer.py b/lbry/dht/blob_announcer.py index 345bbdc9f..5d977aff4 100644 --- a/lbry/dht/blob_announcer.py +++ b/lbry/dht/blob_announcer.py @@ -24,7 +24,7 @@ class BlobAnnouncer: else: log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) except Exception as err: - if isinstance(err, asyncio.CancelledError): + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 raise err log.warning("error announcing %s: %s", blob_hash[:8], str(err)) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 0d0ed7e47..617504b7d 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -376,7 +376,7 @@ class UPnPComponent(Component): self.upnp = await UPnP.discover(loop=self.component_manager.loop) log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string) except Exception as err: - if isinstance(err, asyncio.CancelledError): + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 raise log.warning("upnp discovery failed: %s", err) self.upnp = None diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index cf3a28eb3..a3e422817 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -371,7 +371,7 @@ class StreamManager: except asyncio.TimeoutError: raise ResolveTimeoutError(uri) except Exception as err: - if isinstance(err, asyncio.CancelledError): + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 raise log.exception("Unexpected error resolving stream:") raise ResolveError(f"Unexpected error resolving stream: {str(err)}") diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index a3fbdf257..9847b03e3 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -701,7 +701,9 @@ class Ledger(metaclass=LedgerRegistry): "%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ", account.id, balance, total_receiving, account.receiving.gap, total_change, account.change.gap, channel_count, len(account.channel_keys), claim_count) - except: # pylint: disable=bare-except + except Exception as err: + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 + raise log.exception( 'Failed to display wallet state, please file issue ' 'for this bug along with the traceback you see below:') @@ -724,7 +726,9 @@ class Ledger(metaclass=LedgerRegistry): claim_ids = [p.purchased_claim_id for p in purchases] try: resolved, _, _ = await self.claim_search([], claim_ids=claim_ids) - except: # pylint: disable=bare-except + except Exception as err: + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 + raise log.exception("Resolve failed while looking up purchased claim ids:") resolved = [] lookup = {claim.claim_id: claim for claim in resolved} @@ -757,7 +761,9 @@ class Ledger(metaclass=LedgerRegistry): claim_ids = collection.claim.collection.claims.ids[offset:page_size+offset] try: resolve_results, _, _ = await self.claim_search([], claim_ids=claim_ids) - except: # pylint: disable=bare-except + except Exception as err: + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 + raise log.exception("Resolve failed while looking up collection claim ids:") return [] claims = [] diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 85bdb4569..65d35e945 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -320,7 +320,7 @@ class TestStreamManager(BlobExchangeTestBase): try: await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout) except Exception as err: - if isinstance(err, asyncio.CancelledError): + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 raise error = err self.assertEqual(expected_error, type(error)) From 0bb4cdadd9b792c6d0772ff1c072ba38e36a17a7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 10 Jan 2020 13:57:52 -0500 Subject: [PATCH 5/5] use network.subscribe_address --- lbry/wallet/ledger.py | 31 ++++++++++--------------------- lbry/wallet/network.py | 4 ++++ 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 9847b03e3..ce950f7a4 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -448,29 +448,18 @@ class Ledger(metaclass=LedgerRegistry): async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str], batch_size: int = 1000): if self.network.is_connected and addresses: addresses_remaining = list(addresses) - retries = 0 while addresses_remaining: batch = addresses_remaining[:batch_size] - try: - results = await self.network.rpc('blockchain.address.subscribe', batch, True) - for address, remote_status in zip(batch, results): - self._update_tasks.add(self.update_history(address, remote_status, address_manager)) - retries = 0 - addresses_remaining = addresses_remaining[batch_size:] - log.info("subscribed to %i/%i addresses", len(addresses) - len(addresses_remaining), len(addresses)) - except asyncio.TimeoutError: - if retries >= 3: - log.warning( - "timed out subscribing to addresses from %s:%i", - *self.network.client.server_address_and_port - ) - # abort and cancel, we can't lose a subscription, it will happen again on reconnect - if self.network.client: - self.network.client.abort() - raise asyncio.CancelledError() - await asyncio.sleep(1) - retries += 1 - log.info("finished subscribing to %i addresses", len(addresses)) + results = await self.network.subscribe_address(*batch) + for address, remote_status in zip(batch, results): + self._update_tasks.add(self.update_history(address, remote_status, address_manager)) + addresses_remaining = addresses_remaining[batch_size:] + log.info("subscribed to %i/%i addresses on %s:%i", len(addresses) - len(addresses_remaining), + len(addresses), *self.network.client.server_address_and_port) + log.info( + "finished subscribing to %i addresses on %s:%i", len(addresses), + *self.network.client.server_address_and_port + ) def process_status_update(self, update): address, remote_status = update diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index b3ae4b1b7..672b2938d 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -261,6 +261,10 @@ class Network: try: return await self.rpc('blockchain.address.subscribe', addresses, True) except asyncio.TimeoutError: + log.warning( + "timed out subscribing to addresses from %s:%i", + *self.client.server_address_and_port + ) # abort and cancel, we can't lose a subscription, it will happen again on reconnect if self.client: self.client.abort()