insert_transactions and other client speed ups

This commit is contained in:
Lex Berezhny 2021-01-19 12:32:05 -05:00
parent f3c1dcef81
commit 5671224fe0
7 changed files with 50 additions and 29 deletions

View file

@ -269,8 +269,8 @@ class Database:
async def insert_tx_filters(self, filters): async def insert_tx_filters(self, filters):
return await self.run(q.insert_tx_filters, filters) return await self.run(q.insert_tx_filters, filters)
async def insert_transaction(self, block_hash, tx): async def insert_transactions(self, txs):
return await self.run(q.insert_transaction, block_hash, tx) return await self.run(q.insert_transactions, txs)
async def update_address_used_times(self, addresses): async def update_address_used_times(self, addresses):
return await self.run(q.update_address_used_times, addresses) return await self.run(q.update_address_used_times, addresses)

View file

@ -94,23 +94,33 @@ class PersistingAddressIterator(DatabaseAddressIterator):
yield hash160(pubkey_child.pubkey_bytes), self.n, True yield hash160(pubkey_child.pubkey_bytes), self.n, True
class BatchAddressIterator:
def __init__(self, iterator: PersistingAddressIterator, size):
self.iterator = iterator
self.size = size
def __iter__(self) -> Iterator[bytearray]:
i = iter(self.iterator)
while True:
yield [bytearray(next(i)[0]) for _ in range(self.size)]
def generate_addresses_using_filters(best_height, allowed_gap, address_manager) -> Set: def generate_addresses_using_filters(best_height, allowed_gap, address_manager) -> Set:
need, have = set(), set() need, have = set(), set()
matchers = get_filter_matchers(best_height) matchers = get_filter_matchers(best_height)
with PersistingAddressIterator(*address_manager) as addresses: with PersistingAddressIterator(*address_manager) as address_iterator:
gap = 0 for addresses in BatchAddressIterator(address_iterator, allowed_gap):
for address_hash, n, is_new in addresses: # pylint: disable=unused-variable has_match = False
gap += 1
address_bytes = bytearray(address_hash)
for matcher, filter_range in matchers: for matcher, filter_range in matchers:
if matcher.Match(address_bytes): if matcher.MatchAny(addresses):
gap = 0 has_match = True
if filter_range not in need and filter_range not in have: if filter_range not in need and filter_range not in have:
if has_filter_range(*filter_range): if has_filter_range(*filter_range):
have.add(filter_range) have.add(filter_range)
else: else:
need.add(filter_range) need.add(filter_range)
if gap >= allowed_gap: if not has_match:
break break
return need return need
@ -127,12 +137,16 @@ def get_missing_sub_filters_for_addresses(granularity, address_manager):
def get_missing_tx_for_addresses(address_manager): def get_missing_tx_for_addresses(address_manager):
need = set() need = set()
for tx_hash, matcher in get_tx_matchers_for_missing_txs(): filters = get_tx_matchers_for_missing_txs()
for address_hash, _, _ in DatabaseAddressIterator(*address_manager): print(f' loaded tx filters ({len(filters)})')
address_bytes = bytearray(address_hash) addresses = DatabaseAddressIterator.get_address_hash_bytes(*address_manager)
if matcher.Match(address_bytes): print(f' loaded addresses ({len(addresses)})')
need.add(tx_hash) print(' matching...')
break for i, (tx_hash, matcher) in enumerate(filters):
if i > 0 and i % 1000 == 0:
print(f' {i} of {len(filters)} processed')
if matcher.MatchAny(addresses):
need.add(tx_hash)
return need return need

View file

@ -64,8 +64,11 @@ def get_block_headers(first, last=None):
return rows return rows
def insert_transaction(block_hash, tx): def insert_transactions(txs):
context().get_bulk_loader().add_transaction(block_hash, tx).flush(TX) loader = context().get_bulk_loader()
for block_hash, tx in txs:
loader.add_transaction(block_hash, tx)
loader.flush(return_row_count_for_table=None)
def check_version_and_create_tables(): def check_version_and_create_tables():

View file

@ -136,13 +136,17 @@ class FilterManager:
async def download_and_save_txs(self, tx_hashes): async def download_and_save_txs(self, tx_hashes):
if not tx_hashes: if not tx_hashes:
return return
txids = [hexlify(tx_hash[::-1]).decode() for tx_hash in tx_hashes] all_txids = [hexlify(tx_hash[::-1]).decode() for tx_hash in tx_hashes]
print(f'=> transaction_search(len(txids): {len(txids)})') chunk_size = 10
txs = await self.client.first.transaction_search(txids=txids, raw=True) for i in range(0, len(all_txids), chunk_size):
print(f' @ transaction_search(len(txids): {len(txids)})') txids = all_txids[i:i + chunk_size]
for raw_tx in txs.values(): print(f' => transaction_search(len(txids): {len(txids)})')
await self.db.insert_transaction(None, Transaction(unhexlify(raw_tx))) txs = await self.client.first.transaction_search(txids=txids, raw=True)
print(f' # transaction_search(len(txids): {len(txids)})') print(f' <= transaction_search(len(txids): {len(txids)})')
await self.db.insert_transactions([
(None, Transaction(unhexlify(raw_tx))) for raw_tx in txs.values()
])
print(f' saved {len(txids)}) transactions')
async def download_initial_filters(self, best_height): async def download_initial_filters(self, best_height):
missing = await self.db.get_missing_required_filters(best_height) missing = await self.db.get_missing_required_filters(best_height)

View file

@ -260,7 +260,7 @@ class AsyncUnitDBTestCase(AsyncioTestCase):
self.outputs.extend(tx.outputs) self.outputs.extend(tx.outputs)
return block_or_tx return block_or_tx
elif isinstance(block_or_tx, Transaction): elif isinstance(block_or_tx, Transaction):
await self.db.insert_transaction(block_hash, block_or_tx) await self.db.insert_transactions([(block_hash, block_or_tx)])
self.outputs.extend(block_or_tx.outputs) self.outputs.extend(block_or_tx.outputs)
return block_or_tx.outputs[0] return block_or_tx.outputs[0]
else: else:

View file

@ -126,8 +126,8 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase):
q.insert_tx_filters([(hexlify(f'tx{height}'.encode()), height, create_address_filter(addresses))]) q.insert_tx_filters([(hexlify(f'tx{height}'.encode()), height, create_address_filter(addresses))])
def test_generate_from_filters_and_download_txs(self): def test_generate_from_filters_and_download_txs(self):
# 15 addresses will get generated, 9 due to filters and 6 due to gap # 18 addresses will get generated
pubkeys = [self.receiving_pubkey.child(n) for n in range(15)] pubkeys = [self.receiving_pubkey.child(n) for n in range(18)]
hashes = [hash160(key.pubkey_bytes) for key in pubkeys] hashes = [hash160(key.pubkey_bytes) for key in pubkeys]
# create all required filters (include 9 of the addresses in the filters) # create all required filters (include 9 of the addresses in the filters)

View file

@ -255,7 +255,7 @@ class TransactionIOBalancing(WalletTestCase):
.add_inputs([self.txi(self.txo(sum(amounts)+0.1))]) \ .add_inputs([self.txi(self.txo(sum(amounts)+0.1))]) \
.add_outputs(utxos) .add_outputs(utxos)
await self.db.insert_transaction(b'beef', self.funding_tx) await self.db.insert_transactions([(b'beef', self.funding_tx)])
return utxos return utxos