sync_apply progress

This commit is contained in:
Lex Berezhny 2020-12-16 10:54:39 -05:00
parent 5d738534cb
commit becef285f6
7 changed files with 185 additions and 180 deletions

View file

@ -41,7 +41,7 @@ def get_best_block_height():
def insert_block(block):
context().get_bulk_loader().add_block(block).flush()
context().get_bulk_loader().add_block(block).flush(return_row_count_for_table=None)
def get_block_headers(first, last=None):

View file

@ -14,7 +14,7 @@ from aiohttp import ClientSession
from lbry.conf import Setting, NOT_SET
from lbry.db import TXO_TYPES, CLAIM_TYPE_NAMES
from lbry.db.utils import constrain_single_or_list
from lbry.wallet import Wallet, Account, SingleKey, HierarchicalDeterministic
from lbry.wallet import Wallet, Account, SingleKey, HierarchicalDeterministic, ENCRYPT_ON_DISK
from lbry.blockchain import Transaction, Output, dewies_to_lbc, dict_values_to_lbc
from lbry.stream.managed_stream import ManagedStream
from lbry.event import EventController, EventStream
@ -675,7 +675,7 @@ class API:
if value and isinstance(value, str) and value[0] in ('[', '{'):
value = json.loads(value)
wallet.preferences[key] = value
wallet.save()
await wallet.notify_change('preference.set')
return {key: value}
WALLET_DOC = """
@ -1151,23 +1151,23 @@ class API:
wallet_changed = False
if data is not None:
added_accounts = await wallet.merge(password, data)
if added_accounts and self.ledger.sync.network.is_connected:
if blocking:
await asyncio.wait([
a.ledger.subscribe_account(a) for a in added_accounts
])
else:
for new_account in added_accounts:
asyncio.create_task(self.ledger.subscribe_account(new_account))
# if added_accounts and self.ledger.sync.network.is_connected:
# if blocking:
# await asyncio.wait([
# a.ledger.subscribe_account(a) for a in added_accounts
# ])
# else:
# for new_account in added_accounts:
# asyncio.create_task(self.ledger.subscribe_account(new_account))
wallet_changed = True
if wallet.preferences.get(ENCRYPT_ON_DISK, False) and password != wallet.encryption_password:
wallet.encryption_password = password
wallet_changed = True
if wallet_changed:
wallet.save()
await wallet.notify_change('sync')
encrypted = wallet.pack(password)
return {
'hash': self.sync_hash(wallet_id),
'hash': await self.sync_hash(wallet_id),
'data': encrypted.decode()
}
@ -2545,9 +2545,15 @@ class API:
Usage:
block list <start_height> [<end_height>]
"""
return await self.service.sync.get_block_headers(
headers = await self.service.sync.get_block_headers(
start_height=start_height, end_height=end_height
)
for header in headers:
header.update({
'block_hash': hexlify(header['block_hash']),
'previous_hash': hexlify(header['previous_hash'])
})
return headers
async def block_tip(self) -> int: # block number at the tip of the blockchain
"""
@ -3457,7 +3463,12 @@ class Client(API):
else:
raise ValueError(f'Unknown message received: {d}')
async def send(self, method, **kwargs) -> EventStream:
async def send(self, method, *args, **kwargs) -> EventStream:
if args:
raise TypeError(
f"API client requires all named parameter arguments, "
f"received positional arguments instead: {args}"
)
self.message_id += 1
self.requests[self.message_id] = ec = EventController()
await self.ws.send_json({'id': self.message_id, 'method': method, 'params': kwargs})

View file

@ -111,14 +111,13 @@ class FilterManager:
self.cache = {}
async def download(self):
filters_response = await self.client.get_address_filters(0, 500)
filters = await filters_response.first
address = None
address_array = [bytearray(self.client.ledger.address_to_hash160(address))]
for address_filter in filters:
print(address_filter)
address_filter = get_address_filter(unhexlify(address_filter['filter']))
print(address_filter.MatchAny(address_array))
filters = await self.client.first.address_filter(start_height=0, end_height=500, granularity=0)
#address = None
#address_array = [bytearray(self.db.ledger.address_to_hash160(address))]
#for address_filter in filters:
# print(address_filter)
# address_filter = get_address_filter(unhexlify(address_filter['filter']))
# print(address_filter.MatchAny(address_array))
# address_array = [
@ -159,19 +158,19 @@ class BlockHeaderManager:
async def download(self):
our_height = await self.db.get_best_block_height()
best_height = await self.client.block_tip()
for block in await self.client.block_list(our_height+1, best_height):
best_height = await self.client.first.block_tip()
for block in await self.client.first.block_list(start_height=our_height+1, end_height=best_height):
await self.db.insert_block(Block(
height=block["height"],
version=0,
file_number=0,
block_hash=block["block_hash"],
prev_block_hash=block["previous_hash"],
merkle_root=block["merkle_root"],
claim_trie_root=block["claim_trie_root"],
block_hash=unhexlify(block["block_hash"]),
prev_block_hash=unhexlify(block["previous_hash"]),
merkle_root=b'', # block["merkle_root"],
claim_trie_root=b'', # block["claim_trie_root"],
timestamp=block["timestamp"],
bits=block["bits"],
nonce=block["nonce"],
bits=0, # block["bits"],
nonce=0, # block["nonce"],
txs=[]
))

View file

@ -27,7 +27,7 @@ from lbry.blockchain.bcd_data_stream import BCDataStream
from lbry.blockchain.lbrycrd import Lbrycrd
from lbry.blockchain.dewies import lbc_to_dewies
from lbry.constants import COIN, CENT, NULL_HASH32
from lbry.service import Daemon, FullNode, FullEndpoint, LightClient, jsonrpc_dumps_pretty
from lbry.service import API, Daemon, Service, FullNode, FullEndpoint, LightClient, jsonrpc_dumps_pretty
from lbry.conf import Config
from lbry.console import Console
from lbry.wallet import Wallet, Account
@ -404,15 +404,8 @@ class IntegrationTestCase(AsyncioTestCase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ledger: Optional[RegTestLedger] = None
self.chain: Optional[Lbrycrd] = None
self.block_expected = 0
self._pg_db_counter = 0
self._api_port = 5252
self.service = None
self.api = None
self.wallet: Optional[Wallet] = None
self.account: Optional[Account] = None
self.chain: Lbrycrd = None
async def asyncSetUp(self):
await super().asyncSetUp()
@ -421,29 +414,37 @@ class IntegrationTestCase(AsyncioTestCase):
self.addCleanup(self.chain.stop)
await self.chain.start(*self.LBRYCRD_ARGS)
async def provision_db_from_environment(self, conf):
self.db_driver = os.environ.get('TEST_DB', 'sqlite')
if self.db_driver == 'sqlite':
async def provision_db_from_environment(self, conf, uid):
test_db = os.environ.get('TEST_DB', 'sqlite')
if test_db == 'sqlite':
pass
elif self.db_driver.startswith('postgres') or self.db_driver.startswith('psycopg'):
self._pg_db_counter += 1
self.db_driver = 'postgresql'
db_name = f'lbry_test_db_{self._pg_db_counter}'
elif test_db.startswith('postgres') or test_db.startswith('psycopg'):
db_name = f'lbry_test_db_{uid}'
db_connection = 'postgres:postgres@localhost:5432'
meta_db = Database.from_url(f'postgresql://{db_connection}/postgres')
await meta_db.drop(db_name)
await meta_db.create(db_name)
conf.db_url = f'postgresql://{db_connection}/{db_name}'
else:
raise RuntimeError(f"Unsupported database driver: {self.db_driver}")
raise RuntimeError(
f"Unsupported TEST_DB: '{test_db}', "
f"valid options: sqlite, postgres."
)
async def make_daemons_from_environment(self, **kwargs) -> Tuple[Daemon, Daemon]:
full_node_daemon = client_daemon = await self.make_full_node_daemon(**kwargs)
self.test_mode = os.environ.get('TEST_MODE', 'node')
if self.test_mode == 'client':
full_node_daemon = await self.make_full_node_daemon(**kwargs)
test_mode = os.environ.get('TEST_MODE', 'node')
if test_mode == 'node':
client_daemon = full_node_daemon
elif test_mode == 'client':
client_daemon = await self.make_light_client_daemon(full_node_daemon, **kwargs)
elif self.test_mode == 'endpoint':
elif test_mode == 'endpoint':
client_daemon = await self.make_full_endpoint_daemon(full_node_daemon, **kwargs)
else:
raise RuntimeError(
f"Unsupported TEST_MODE: '{test_mode}', "
f"valid options: node, client, endpoint."
)
return full_node_daemon, client_daemon
async def make_full_node_daemon(self, start=True, **conf_kwargs):
@ -459,7 +460,7 @@ class IntegrationTestCase(AsyncioTestCase):
lbrycrd_zmq=self.chain.ledger.conf.lbrycrd_zmq,
**conf_kwargs
)
await self.provision_db_from_environment(conf)
await self.provision_db_from_environment(conf, self._api_port)
ledger = RegTestLedger(conf)
service = FullNode(ledger)
console = Console(service)
@ -479,7 +480,7 @@ class IntegrationTestCase(AsyncioTestCase):
full_nodes=[(full_node.conf.api_host, full_node.conf.api_port)],
**conf_kwargs
)
await self.provision_db_from_environment(conf)
await self.provision_db_from_environment(conf, self._api_port)
ledger = RegTestLedger(conf)
service = FullEndpoint(ledger)
console = Console(service)
@ -489,14 +490,17 @@ class IntegrationTestCase(AsyncioTestCase):
await daemon.start()
return daemon
async def make_light_client_daemon(self, full_node, start=True):
async def make_light_client_daemon(self, full_node, start=True, **conf_kwargs):
self._api_port += 1
path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, path, True)
self._node_port += 1
ledger = RegTestLedger(Config.with_same_dir(path).set(
api=f'localhost:{self._node_port}',
full_nodes=[(full_node.conf.api_host, full_node.conf.api_port)]
))
conf = Config.with_same_dir(path).set(
blockchain="regtest",
api=f"localhost:{self._api_port}",
full_nodes=[(full_node.conf.api_host, full_node.conf.api_port)],
**conf_kwargs
)
ledger = RegTestLedger(conf)
service = LightClient(ledger)
console = Console(service)
daemon = Daemon(service, console)
@ -521,6 +525,62 @@ class IntegrationTestCase(AsyncioTestCase):
balance = await account.get_balance()
self.assertEqual(dewies_to_lbc(balance['available']), expected_balance)
class FakeExchangeRateManager(ExchangeRateManager):
def __init__(self, market_feeds, rates): # pylint: disable=super-init-not-called
self.market_feeds = market_feeds
for feed in self.market_feeds:
feed.last_check = time.time()
feed.rate = ExchangeRate(feed.market, rates[feed.market], time.time())
def start(self):
pass
def stop(self):
pass
def get_fake_exchange_rate_manager(rates=None):
return FakeExchangeRateManager(
[LBRYFeed(), LBRYBTCFeed()],
rates or {'BTCLBC': 3.0, 'USDBTC': 2.0}
)
class CommandTestCase(IntegrationTestCase):
VERBOSITY = logging.WARN
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.block_expected = 0
self.full_node_daemon: Daemon = None
self.daemon: Daemon = None
self.service: Service = None
self.conf: Config = None
self.api: API = None
async def asyncSetUp(self):
await super().asyncSetUp()
await self.generate(200, wait=False)
self.full_node_daemon, self.daemon = await self.make_daemons_from_environment()
self.service = self.daemon.service
self.conf = self.daemon.conf
self.api = self.daemon.api
self.wallet = self.service.wallets.default
self.account = self.wallet.accounts.default
address = await self.account.receiving.get_or_create_usable_address()
self.conf.upload_dir = os.path.join(self.conf.data_dir, 'uploads')
os.mkdir(self.conf.upload_dir)
await self.chain.send_to_address(address, '10.0')
await self.generate(5)
def broadcast(self, tx):
return self.ledger.broadcast(tx)
@ -554,75 +614,6 @@ class IntegrationTestCase(AsyncioTestCase):
lambda e: e.tx.id == tx.id and e.address == address
)
class FakeExchangeRateManager(ExchangeRateManager):
def __init__(self, market_feeds, rates): # pylint: disable=super-init-not-called
self.market_feeds = market_feeds
for feed in self.market_feeds:
feed.last_check = time.time()
feed.rate = ExchangeRate(feed.market, rates[feed.market], time.time())
def start(self):
pass
def stop(self):
pass
def get_fake_exchange_rate_manager(rates=None):
return FakeExchangeRateManager(
[LBRYFeed(), LBRYBTCFeed()],
rates or {'BTCLBC': 3.0, 'USDBTC': 2.0}
)
class CommandTestCase(IntegrationTestCase):
VERBOSITY = logging.WARN
blob_lru_cache_size = 0
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.daemon_port = 5252
self.daemon = None
self.daemons = []
self.server_config = None
self.server_storage = None
self.extra_wallet_nodes = []
self.extra_wallet_node_port = 5281
self.server_blob_manager = None
self.server = None
self.reflector = None
async def asyncSetUp(self):
await super().asyncSetUp()
await self.generate(200, wait=False)
self.full_node_daemon, self.daemon = await self.make_daemons_from_environment()
self.service = self.daemon.service
self.ledger = self.service.ledger
self.api = self.daemon.api
self.wallet = self.service.wallets.default
self.account = self.wallet.accounts.default
address = await self.account.receiving.get_or_create_usable_address()
self.ledger.conf.upload_dir = os.path.join(self.ledger.conf.data_dir, 'uploads')
os.mkdir(self.ledger.conf.upload_dir)
await self.chain.send_to_address(address, '10.0')
await self.generate(5)
async def asyncTearDown(self):
await super().asyncTearDown()
for wallet_node in self.extra_wallet_nodes:
await wallet_node.stop(cleanup=True)
for daemon in self.daemons:
daemon.component_manager.get_component('wallet')._running = False
await daemon.stop()
async def confirm_tx(self, txid, ledger=None):
""" Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """
await self.on_transaction_id(txid, ledger)
@ -702,7 +693,7 @@ class CommandTestCase(IntegrationTestCase):
def create_upload_file(self, data, prefix=None, suffix=None):
file_path = tempfile.mktemp(
prefix=prefix or "tmp", suffix=suffix or "", dir=self.ledger.conf.upload_dir
prefix=prefix or "tmp", suffix=suffix or "", dir=self.conf.upload_dir
)
with open(file_path, 'w+b') as file:
file.write(data)

View file

@ -1,3 +1,3 @@
from .wallet import Wallet
from .wallet import Wallet, ENCRYPT_ON_DISK
from .manager import WalletManager
from .account import Account, SingleKey, HierarchicalDeterministic

View file

@ -61,10 +61,11 @@ class Wallet:
self.supports = SupportListManager(self)
async def generate_addresses(self):
await asyncio.wait([
account.ensure_address_gap()
for account in self.accounts
])
if self.accounts:
await asyncio.wait([
account.ensure_address_gap()
for account in self.accounts
])
async def notify_change(self, field: str, value=None):
await self._on_change_controller.add({

View file

@ -152,31 +152,30 @@ class WalletCommands(CommandTestCase):
class WalletEncryptionAndSynchronization(IntegrationTestCase):
SEED = (
"carbon smart garage balance margin twelve chest "
"sword toast envelope bottom stomach absent"
)
async def asyncSetUp(self):
await super().asyncSetUp()
await self.chain.generate(200)
self.full_node_daemon, self.daemon1 = await self.make_daemons_from_environment(
create_default_wallet=False
create_default_account=False
)
self.daemon2 = await self.make_light_client_daemon(
self.full_node_daemon, create_default_wallet=False
self.full_node_daemon,
create_default_account=False
)
self.other_wallet = self.other_node.service.wallets.default
self.other_account = self.other_wallet.accounts.default
address = await self.other_account.receiving.get_or_create_usable_address()
self.account1 = await self.daemon1.api.account_add('account1', seed=(
"carbon smart garage balance margin twelve chest "
"sword toast envelope bottom stomach absent"
))
self.account2 = await self.daemon2.api.account_add('account2', seed=(
"chest sword toast envelope bottom stomach absent "
"carbon smart garage balance margin twelve"
))
address = await self.account1.receiving.get_or_create_usable_address()
await self.chain.send_to_address(address, '10.0')
await self.generate(5)
self.daemon2 = await self.add_full_node(
seed="chest sword toast envelope bottom stomach absent "
"carbon smart garage balance margin twelve"
)
address = (await self.daemon2.wallet_manager.default_account.receiving.get_addresses(limit=1, only_usable=True))[0]
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid, self.daemon2.ledger)
await self.chain.generate(1)
address = await self.account2.receiving.get_or_create_usable_address()
await self.chain.send_to_address(address, '1.0')
await self.chain.generate(1)
def assertWalletEncrypted(self, wallet_path, encrypted):
with open(wallet_path) as opened:
@ -184,58 +183,62 @@ class WalletEncryptionAndSynchronization(IntegrationTestCase):
self.assertEqual(wallet['accounts'][0]['private_key'][1:4] != 'prv', encrypted)
async def test_sync(self):
daemon, daemon2 = self.daemon, self.daemon2
api1, api2 = self.daemon1.api, self.daemon2.api
# Preferences
self.assertFalse(daemon.jsonrpc_preference_get())
self.assertFalse(daemon2.jsonrpc_preference_get())
self.assertFalse(await api1.preference_get())
self.assertFalse(await api2.preference_get())
daemon.jsonrpc_preference_set("fruit", '["peach", "apricot"]')
daemon.jsonrpc_preference_set("one", "1")
daemon.jsonrpc_preference_set("conflict", "1")
daemon2.jsonrpc_preference_set("another", "A")
await api1.preference_set("fruit", '["peach", "apricot"]')
await api1.preference_set("one", "1")
await api1.preference_set("conflict", "1")
await api2.preference_set("another", "A")
await asyncio.sleep(1)
# these preferences will win after merge since they are "newer"
daemon2.jsonrpc_preference_set("two", "2")
daemon2.jsonrpc_preference_set("conflict", "2")
daemon.jsonrpc_preference_set("another", "B")
await api2.preference_set("two", "2")
await api2.preference_set("conflict", "2")
await api1.preference_set("another", "B")
self.assertDictEqual(daemon.jsonrpc_preference_get(), {
self.assertDictEqual(await api1.preference_get(), {
"one": "1", "conflict": "1", "another": "B", "fruit": ["peach", "apricot"]
})
self.assertDictEqual(daemon2.jsonrpc_preference_get(), {
self.assertDictEqual(await api2.preference_get(), {
"two": "2", "conflict": "2", "another": "A"
})
self.assertItemCount(await daemon.jsonrpc_account_list(), 1)
self.assertEqual(len(await api1.account_list()), 1)
data = await daemon2.jsonrpc_sync_apply('password')
await daemon.jsonrpc_sync_apply('password', data=data['data'], blocking=True)
data = await api2.sync_apply('password')
await api1.sync_apply('password', data=data['data'], blocking=True)
self.assertItemCount(await daemon.jsonrpc_account_list(), 2)
self.assertEqual(len(await api1.account_list()), 2)
self.assertDictEqual(
# "two" key added and "conflict" value changed to "2"
daemon.jsonrpc_preference_get(),
await api1.preference_get(),
{"one": "1", "two": "2", "conflict": "2", "another": "B", "fruit": ["peach", "apricot"]}
)
# Channel Certificate
channel = await daemon2.jsonrpc_channel_create('@foo', '0.1')
await self.confirm_tx(channel.id, self.daemon2.ledger)
tx = await api2.channel_create('@foo', '0.1')
await self.chain.generate(1)
await asyncio.wait([
api1.service.wait(tx),
api2.service.wait(tx),
])
# both daemons will have the channel but only one has the cert so far
self.assertItemCount(await daemon.jsonrpc_channel_list(), 1)
self.assertEqual(len(daemon.wallet_manager.default_wallet.accounts[1].channel_keys), 0)
self.assertItemCount(await daemon2.jsonrpc_channel_list(), 1)
self.assertEqual(len(daemon2.wallet_manager.default_account.channel_keys), 1)
self.assertEqual(len(await api1.channel_list()), 1)
self.assertEqual(len(self.account1.channel_keys), 0)
self.assertEqual(len(await api2.channel_list()), 1)
self.assertEqual(len(self.account2.channel_keys), 0)
data = await daemon2.jsonrpc_sync_apply('password')
await daemon.jsonrpc_sync_apply('password', data=data['data'], blocking=True)
data = await api2.sync_apply('password')
await api1.sync_apply('password', data=data['data'], blocking=True)
# both daemons have the cert after sync'ing
self.assertEqual(
daemon2.wallet_manager.default_account.channel_keys,
daemon.wallet_manager.default_wallet.accounts[1].channel_keys
self.account1.channel_keys,
self.account2.channel_keys
)
async def test_encryption_and_locking(self):