better output and bulk insert

This commit is contained in:
Lex Berezhny 2021-01-11 10:36:46 -05:00
parent f89c7540b7
commit 17577d8e85
2 changed files with 63 additions and 28 deletions

View file

@ -35,6 +35,9 @@ class LightClient(Service):
await self.client.connect()
await super().start()
await self.client.start_event_streams()
self.wallets.on_change.listen(
lambda _: self.sync.on_block_event.set()
)
async def stop(self):
await super().stop()
@ -82,33 +85,54 @@ class FilterManager:
self.cache = {}
async def download_and_save_filters(self, needed_filters):
if not needed_filters:
print(' nothing to download')
for factor, filter_start, filter_end in needed_filters:
print(f'loop, factor: {factor}, filter start: {filter_start}, filter end: {filter_end}')
if factor == 0:
print(
f'=> address_filter(granularity={factor}, '
f'start_height={filter_start}, end_height={filter_end})'
)
filters = await self.client.first.address_filter(
granularity=factor, start_height=filter_start, end_height=filter_end
)
print(f'tx_filters: {len(filters)}')
for tx_filter in filters:
await self.db.insert_tx_filter(
unhexlify(tx_filter["txid"])[::-1], tx_filter["height"],
unhexlify(tx_filter["filter"])
)
print(
f'<= address_filter(granularity={factor}, '
f'start_height={filter_start}, end_height={filter_end})'
)
print(f' inserting {len(filters)} tx filters...')
await self.db.insert_tx_filters((
unhexlify(tx_filter["txid"])[::-1],
tx_filter["height"],
unhexlify(tx_filter["filter"])
) for tx_filter in filters)
elif factor <= 3:
print(
f'=> address_filter(granularity={factor}, '
f'start_height={filter_start}, end_height={filter_end})'
)
filters = await self.client.first.address_filter(
granularity=factor, start_height=filter_start, end_height=filter_end
)
print(
f'<= address_filter(granularity={factor}, '
f'start_height={filter_start}, end_height={filter_end})'
)
await self.db.insert_block_filters(
(block_filter["height"], factor, unhexlify(block_filter["filter"]))
for block_filter in filters
)
else:
if factor > 1:
step = 10**factor
else:
step = 1
for start in range(filter_start, filter_end+1, step):
for start in range(filter_start, filter_end+1, 10**factor):
print(f'=> address_filter(granularity={factor}, start_height={start})')
filters = await self.client.first.address_filter(
granularity=factor, start_height=start
)
print(f'<= address_filter(granularity={factor}, start_height={start})')
for block_filter in filters:
await self.db.insert_block_filter(
block_filter["height"], factor, unhexlify(block_filter["filter"])
)
await self.db.insert_block_filters(
(block_filter["height"], factor, unhexlify(block_filter["filter"]))
for block_filter in filters
)
async def download_and_save_txs(self, tx_hashes):
if not tx_hashes:
@ -161,18 +185,19 @@ class FilterManager:
await self.download_and_save_txs(missing)
async def download(self, best_height: int, wallets: WalletManager):
print('download_initial_filters')
print('downloading initial filters...')
await self.download_initial_filters(best_height)
print('generate_addresses')
print('generating addresses...')
await self.generate_addresses(best_height, wallets)
print('download_sub_filters 3')
print('downloading level 3 filters...')
await self.download_sub_filters(3, wallets)
print('download_sub_filters 2')
print('downloading level 2 filters...')
await self.download_sub_filters(2, wallets)
print('download_sub_filters 1')
print('downloading level 1 filters...')
await self.download_sub_filters(1, wallets)
print('download_transactions')
print('downloading transactions...')
await self.download_transactions(wallets)
print(f" = finished sync'ing up-to block {best_height} = ")
@staticmethod
def get_root_of_merkle_tree(branches, branch_positions, working_branch):
@ -219,7 +244,7 @@ class BlockHeaderManager:
async def download(self, best_height):
print('downloading blocks...')
our_height = await self.db.get_best_block_height()
for start in range(our_height+1, best_height, 10000):
for start in range(our_height+1, best_height, 50000):
end = min(start+9999, best_height)
print(f'=> block_list(start_height={start}, end_height={end})')
blocks = await self.client.first.block_list(start_height=start, end_height=end)
@ -284,10 +309,12 @@ class FastSync(Sync):
async def advance(self):
height = self.best_height or await self.client.first.block_tip()
await asyncio.wait([
self.blocks.download(height),
self.filters.download(height, self.service.wallets),
])
await self.blocks.download(height)
await self.filters.download(height, self.service.wallets)
# await asyncio.wait([
# self.blocks.download(height),
# self.filters.download(height, self.service.wallets),
# ])
await self._on_synced_controller.add(height)
async def loop(self):

View file

@ -7,6 +7,7 @@ from typing import Optional, Dict
from lbry.db import Database
from lbry.blockchain.dewies import dict_values_to_lbc
from lbry.event import EventController
from .wallet import Wallet
from .account import SingleKey, HierarchicalDeterministic
@ -19,6 +20,8 @@ class WalletManager:
def __init__(self, db: Database):
self.db = db
self.ledger = db.ledger
self._on_change_controller = EventController()
self.on_change = self._on_change_controller.stream
self.wallets: Dict[str, Wallet] = {}
if self.ledger.conf.wallet_storage == "file":
self.storage = FileWallet(self.db, self.ledger.conf.wallet_dir)
@ -112,7 +115,12 @@ class WalletManager:
def add(self, wallet: Wallet) -> Wallet:
self.wallets[wallet.id] = wallet
wallet.on_change.listen(lambda _: self.storage.save(wallet))
def wallet_change_handler(event):
asyncio.create_task(self.storage.save(wallet))
asyncio.create_task(self._on_change_controller.add(event))
wallet.on_change.listen(wallet_change_handler)
return wallet
def remove(self, wallet_id: str) -> Wallet: