291 lines
11 KiB
Python
291 lines
11 KiB
Python
import asyncio
|
|
import logging
|
|
from typing import Dict
|
|
from typing import List, Optional, Tuple
|
|
from binascii import hexlify, unhexlify
|
|
|
|
from lbry.blockchain.block import Block
|
|
from lbry.event import EventController, BroadcastSubscription
|
|
from lbry.crypto.hash import double_sha256
|
|
from lbry.wallet import WalletManager
|
|
from lbry.blockchain import Ledger, Transaction
|
|
from lbry.db import Database
|
|
|
|
from .base import Service, Sync
|
|
from .api import Client as APIClient
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class LightClient(Service):
|
|
|
|
name = "client"
|
|
|
|
sync: 'FastSync'
|
|
|
|
def __init__(self, ledger: Ledger):
|
|
super().__init__(ledger)
|
|
self.client = APIClient(
|
|
f"http://{ledger.conf.full_nodes[0][0]}:{ledger.conf.full_nodes[0][1]}/ws"
|
|
)
|
|
self.sync = FastSync(self, self.client)
|
|
|
|
async def start(self):
|
|
await self.client.connect()
|
|
await super().start()
|
|
await self.client.start_event_streams()
|
|
|
|
async def stop(self):
|
|
await super().stop()
|
|
await self.client.disconnect()
|
|
|
|
async def search_transactions(self, txids):
|
|
return await self.client.transaction_search(txids=txids)
|
|
|
|
async def get_address_filters(self, start_height: int, end_height: int = None, granularity: int = 0):
|
|
return await self.client.first.address_filter(
|
|
granularity=granularity, start_height=start_height, end_height=end_height
|
|
)
|
|
|
|
async def broadcast(self, tx):
|
|
pass
|
|
|
|
async def wait(self, tx: Transaction, height=-1, timeout=1):
|
|
pass
|
|
|
|
async def resolve(self, urls, **kwargs):
|
|
pass
|
|
|
|
async def search_claims(self, accounts, **kwargs):
|
|
pass
|
|
|
|
async def search_supports(self, accounts, **kwargs):
|
|
pass
|
|
|
|
async def sum_supports(
|
|
self, claim_hash: bytes, include_channel_content=False, exclude_own_supports=False
|
|
) -> Tuple[List[Dict], int]:
|
|
return await self.client.sum_supports(claim_hash, include_channel_content, exclude_own_supports)
|
|
|
|
|
|
class FilterManager:
|
|
"""
|
|
Efficient on-demand address filter access.
|
|
Stores and retrieves from local db what it previously downloaded and
|
|
downloads on-demand what it doesn't have from full node.
|
|
"""
|
|
|
|
def __init__(self, db: Database, client: APIClient):
|
|
self.db = db
|
|
self.client = client
|
|
self.cache = {}
|
|
|
|
async def download_and_save_filters(self, needed_filters):
|
|
for factor, start, end in needed_filters:
|
|
print(f'=> address_filter(granularity={factor}, start_height={start}, end_height={end})')
|
|
if factor > 3:
|
|
print('skipping')
|
|
continue
|
|
filters = await self.client.first.address_filter(
|
|
granularity=factor, start_height=start, end_height=end
|
|
)
|
|
print(f'<= address_filter(granularity={factor}, start_height={start}, end_height={end})')
|
|
if factor == 0:
|
|
for tx_filter in filters:
|
|
await self.db.insert_tx_filter(
|
|
unhexlify(tx_filter["txid"])[::-1], tx_filter["height"], unhexlify(tx_filter["filter"])
|
|
)
|
|
else:
|
|
for block_filter in filters:
|
|
await self.db.insert_block_filter(
|
|
block_filter["height"], factor, unhexlify(block_filter["filter"])
|
|
)
|
|
|
|
async def download_and_save_txs(self, tx_hashes):
|
|
if not tx_hashes:
|
|
return
|
|
txids = [hexlify(tx_hash[::-1]).decode() for tx_hash in tx_hashes]
|
|
txs = await self.client.first.transaction_search(txids=txids)
|
|
for raw_tx in txs.values():
|
|
await self.db.insert_transaction(None, Transaction(unhexlify(raw_tx)))
|
|
|
|
async def download_initial_filters(self, best_height):
|
|
missing = await self.db.get_missing_required_filters(best_height)
|
|
await self.download_and_save_filters(missing)
|
|
|
|
async def generate_addresses(self, best_height: int, wallets: WalletManager):
|
|
for wallet in wallets:
|
|
for account in wallet.accounts:
|
|
for address_manager in account.address_managers.values():
|
|
missing = await self.db.generate_addresses_using_filters(
|
|
best_height, address_manager.gap, (
|
|
account.id,
|
|
address_manager.chain_number,
|
|
address_manager.public_key.pubkey_bytes,
|
|
address_manager.public_key.chain_code,
|
|
address_manager.public_key.depth
|
|
)
|
|
)
|
|
await self.download_and_save_filters(missing)
|
|
|
|
async def download_sub_filters(self, granularity: int, wallets: WalletManager):
|
|
for wallet in wallets:
|
|
for account in wallet.accounts:
|
|
for address_manager in account.address_managers.values():
|
|
missing = await self.db.get_missing_sub_filters_for_addresses(
|
|
granularity, (account.id, address_manager.chain_number)
|
|
)
|
|
await self.download_and_save_filters(missing)
|
|
|
|
async def download_transactions(self, wallets: WalletManager):
|
|
for wallet in wallets:
|
|
for account in wallet.accounts:
|
|
for address_manager in account.address_managers.values():
|
|
missing = await self.db.get_missing_tx_for_addresses(
|
|
(account.id, address_manager.chain_number)
|
|
)
|
|
await self.download_and_save_txs(missing)
|
|
|
|
async def download(self, best_height: int, wallets: WalletManager):
|
|
print('download_initial_filters')
|
|
await self.download_initial_filters(best_height)
|
|
print('generate_addresses')
|
|
await self.generate_addresses(best_height, wallets)
|
|
print('download_sub_filters 3')
|
|
await self.download_sub_filters(3, wallets)
|
|
print('download_sub_filters 2')
|
|
await self.download_sub_filters(2, wallets)
|
|
print('download_sub_filters 1')
|
|
await self.download_sub_filters(1, wallets)
|
|
print('download_transactions')
|
|
await self.download_transactions(wallets)
|
|
|
|
@staticmethod
|
|
def get_root_of_merkle_tree(branches, branch_positions, working_branch):
|
|
for i, branch in enumerate(branches):
|
|
other_branch = unhexlify(branch)[::-1]
|
|
other_branch_on_left = bool((branch_positions >> i) & 1)
|
|
if other_branch_on_left:
|
|
combined = other_branch + working_branch
|
|
else:
|
|
combined = working_branch + other_branch
|
|
working_branch = double_sha256(combined)
|
|
return hexlify(working_branch[::-1])
|
|
|
|
# async def maybe_verify_transaction(self, tx, remote_height, merkle=None):
|
|
# tx.height = remote_height
|
|
# cached = self._tx_cache.get(tx.hash)
|
|
# if not cached:
|
|
# # cache txs looked up by transaction_show too
|
|
# cached = TransactionCacheItem()
|
|
# cached.tx = tx
|
|
# self._tx_cache[tx.hash] = cached
|
|
# if 0 < remote_height < len(self.headers) and cached.pending_verifications <= 1:
|
|
# # can't be tx.pending_verifications == 1 because we have to handle the transaction_show case
|
|
# if not merkle:
|
|
# merkle = await self.network.retriable_call(self.network.get_merkle, tx.hash, remote_height)
|
|
# merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash)
|
|
# header = await self.headers.get(remote_height)
|
|
# tx.position = merkle['pos']
|
|
# tx.is_verified = merkle_root == header['merkle_root']
|
|
|
|
|
|
class BlockHeaderManager:
|
|
"""
|
|
Efficient on-demand block header access.
|
|
Stores and retrieves from local db what it previously downloaded and
|
|
downloads on-demand what it doesn't have from full node.
|
|
"""
|
|
|
|
def __init__(self, db: Database, client: APIClient):
|
|
self.db = db
|
|
self.client = client
|
|
self.cache = {}
|
|
|
|
async def download(self, best_height):
|
|
print('downloading blocks...')
|
|
our_height = await self.db.get_best_block_height()
|
|
print(f'=> block_list(start_height={our_height+1}, end_height={best_height})')
|
|
blocks = await self.client.first.block_list(start_height=our_height+1, end_height=best_height)
|
|
print(f'<= block_list(start_height={our_height+1}, end_height={best_height})')
|
|
for block in blocks:
|
|
if block["height"] % 10000 == 0 or block["height"] < 2:
|
|
print(f'block {block["height"]}')
|
|
await self.db.insert_block(Block(
|
|
height=block["height"],
|
|
version=0,
|
|
file_number=0,
|
|
block_hash=unhexlify(block["block_hash"]),
|
|
prev_block_hash=unhexlify(block["previous_hash"]),
|
|
merkle_root=b'', # block["merkle_root"],
|
|
claim_trie_root=b'', # block["claim_trie_root"],
|
|
timestamp=block["timestamp"],
|
|
bits=0, # block["bits"],
|
|
nonce=0, # block["nonce"],
|
|
txs=[]
|
|
))
|
|
|
|
async def get_header(self, height):
|
|
blocks = await self.client.first.block_list(height=height)
|
|
if blocks:
|
|
return blocks[0]
|
|
|
|
|
|
class FastSync(Sync):
|
|
|
|
def __init__(self, service: Service, client: APIClient):
|
|
super().__init__(service.ledger, service.db)
|
|
self.service = service
|
|
self.client = client
|
|
self.advance_loop_task: Optional[asyncio.Task] = None
|
|
self.on_block = client.get_event_stream("blockchain.block")
|
|
self.on_block_event = asyncio.Event()
|
|
self.on_block_subscription: Optional[BroadcastSubscription] = None
|
|
self._on_synced_controller = EventController()
|
|
self.on_synced = self._on_synced_controller.stream
|
|
self.conf.events.register("blockchain.block", self.on_synced)
|
|
self.blocks = BlockHeaderManager(self.db, self.client)
|
|
self.filters = FilterManager(self.db, self.client)
|
|
self.best_height: Optional[int] = None
|
|
|
|
async def get_block_headers(self, start_height: int, end_height: int = None):
|
|
return await self.client.first.block_list(start_height, end_height)
|
|
|
|
async def get_best_block_height(self) -> int:
|
|
return await self.client.first.block_tip()
|
|
|
|
async def start(self):
|
|
self.on_block_subscription = self.on_block.listen(self.handle_on_block)
|
|
self.advance_loop_task = asyncio.create_task(self.advance())
|
|
await self.advance_loop_task
|
|
self.advance_loop_task = asyncio.create_task(self.loop())
|
|
|
|
async def stop(self):
|
|
for task in (self.on_block_subscription, self.advance_loop_task):
|
|
if task is not None:
|
|
task.cancel()
|
|
|
|
def handle_on_block(self, e):
|
|
self.best_height = e[0]
|
|
self.on_block_event.set()
|
|
|
|
async def advance(self):
|
|
height = self.best_height or await self.client.first.block_tip()
|
|
await asyncio.wait([
|
|
self.blocks.download(height),
|
|
self.filters.download(height, self.service.wallets),
|
|
])
|
|
await self._on_synced_controller.add(height)
|
|
|
|
async def loop(self):
|
|
while True:
|
|
try:
|
|
await self.on_block_event.wait()
|
|
self.on_block_event.clear()
|
|
await self.advance()
|
|
except asyncio.CancelledError:
|
|
return
|
|
except Exception as e:
|
|
log.exception(e)
|
|
await self.stop()
|