fix bad usages of hash and some tests

This commit is contained in:
Victor Shyba 2020-03-21 04:32:03 -03:00
parent 9fc7f9904b
commit 19c0a81c42
11 changed files with 30 additions and 24 deletions

View file

@ -143,7 +143,7 @@ class WalletComponent(Component):
progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100)
else:
progress = 100
best_hash = self.wallet_manager.get_best_blockhash()
best_hash = await self.wallet_manager.get_best_blockhash()
result.update({
'headers_synchronization_progress': progress,
'blocks': max(local_height, 0),

View file

@ -370,6 +370,7 @@ class CommandTestCase(IntegrationTestCase):
)
self.extra_wallet_node_port += 1
await wallet_node.start(self.conductor.spv_node, seed=seed)
await wallet_node.ledger.on_ready.first
self.extra_wallet_nodes.append(wallet_node)
upload_dir = os.path.join(wallet_node.data_path, 'uploads')

View file

@ -49,6 +49,8 @@ class Headers:
self.executor = ThreadPoolExecutor(1)
async def open(self):
if not self.executor:
self.executor = ThreadPoolExecutor(1)
if self.path != ':memory:':
if not os.path.exists(self.path):
self.io = open(self.path, 'w+b')
@ -57,7 +59,9 @@ class Headers:
self._size = self.io.seek(0, os.SEEK_END) // self.header_size
async def close(self):
self.executor.shutdown()
if self.executor:
self.executor.shutdown()
self.executor = None
self.io.close()
@staticmethod
@ -142,7 +146,7 @@ class Headers:
async def ensure_chunk_at(self, height):
if await self.has_header(height):
log.info("has header %s", height)
log.debug("has header %s", height)
return
log.info("on-demand fetching height %s", height)
start = (height // 1000) * 1000
@ -208,7 +212,7 @@ class Headers:
# .seek()/.write()/.truncate() might also .flush() when needed
# the goal here is mainly to ensure we're definitely flush()'ing
self.io.flush()
self._size = self.io.tell() // self.header_size
self._size = max(self._size or 0, self.io.tell() // self.header_size)
return written
async def validate_chunk(self, height, chunk):

View file

@ -316,6 +316,8 @@ class Ledger(metaclass=LedgerRegistry):
first_connection = self.network.on_connected.first
asyncio.ensure_future(self.network.start())
await first_connection
async with self._header_processing_lock:
await self._update_tasks.add(self.initial_headers_sync())
await asyncio.gather(*(a.maybe_migrate_certificates() for a in self.accounts))
await asyncio.gather(*(a.save_max_gap() for a in self.accounts))
if len(self.accounts) > 10:
@ -326,10 +328,8 @@ class Ledger(metaclass=LedgerRegistry):
async def join_network(self, *_):
log.info("Subscribing and updating accounts.")
self._update_tasks.add(self.initial_headers_sync())
async with self._header_processing_lock:
await self.headers.ensure_tip()
await self.update_headers()
await self._update_tasks.add(self.initial_headers_sync())
await self.subscribe_accounts()
await self._update_tasks.done.wait()
self._on_ready_controller.add(True)
@ -353,8 +353,9 @@ class Ledger(metaclass=LedgerRegistry):
async def doit():
for height in reversed(range(0, target, 1000)):
await self.headers.ensure_chunk_at(height)
asyncio.ensure_future(doit())
return
await self.headers.ensure_tip()
self._update_tasks.add(doit())
await self.update_headers()
async def update_headers(self, height=None, headers=None, subscription_update=False):
rewound = 0
@ -894,7 +895,7 @@ class Ledger(metaclass=LedgerRegistry):
headers = self.headers
history = []
for tx in txs: # pylint: disable=too-many-nested-blocks
ts = headers.estimated_timestamp(tx.height)['timestamp']
ts = headers.estimated_timestamp(tx.height)
item = {
'txid': tx.id,
'timestamp': ts,

View file

@ -248,10 +248,10 @@ class WalletManager:
log.warning("Failed to migrate %s receiving addresses!",
len(set(receiving_addresses).difference(set(migrated_receiving))))
def get_best_blockhash(self):
async def get_best_blockhash(self):
if len(self.ledger.headers) <= 0:
return self.ledger.genesis_hash
return self.ledger.headers.hash(self.ledger.headers.height).decode()
return (await self.ledger.headers.hash(self.ledger.headers.height)).decode()
def get_unused_address(self):
return self.default_account.receiving.get_or_create_usable_address()

View file

@ -77,6 +77,7 @@ class Conductor:
async def start_wallet(self):
if not self.wallet_started:
await self.wallet_node.start(self.spv_node)
await self.wallet_node.ledger.on_ready.first
self.wallet_started = True
async def stop_wallet(self):

View file

@ -8,7 +8,7 @@ class BlockchainReorganizationTests(IntegrationTestCase):
async def assertBlockHash(self, height):
self.assertEqual(
self.ledger.headers.hash(height).decode(),
(await self.ledger.headers.hash(height)).decode(),
await self.blockchain.get_block_hash(height)
)
@ -16,7 +16,7 @@ class BlockchainReorganizationTests(IntegrationTestCase):
# invalidate current block, move forward 2
self.assertEqual(self.ledger.headers.height, 200)
await self.assertBlockHash(200)
await self.blockchain.invalidate_block(self.ledger.headers.hash(200).decode())
await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode())
await self.blockchain.generate(2)
await self.ledger.on_header.where(lambda e: e.height == 201)
self.assertEqual(self.ledger.headers.height, 201)
@ -24,7 +24,7 @@ class BlockchainReorganizationTests(IntegrationTestCase):
await self.assertBlockHash(201)
# invalidate current block, move forward 3
await self.blockchain.invalidate_block(self.ledger.headers.hash(200).decode())
await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode())
await self.blockchain.generate(3)
await self.ledger.on_header.where(lambda e: e.height == 202)
self.assertEqual(self.ledger.headers.height, 202)

View file

@ -90,13 +90,11 @@ class ReconnectTests(IntegrationTestCase):
while self.conductor.spv_node.server.session_mgr.notified_height < initial_height + 99: # off by 1
await asyncio.sleep(0.1)
self.assertEqual(initial_height, self.ledger.local_height_including_downloaded_height)
# locks header processing so we make sure we are the only ones modifying it
async with self.ledger._header_processing_lock:
await self.ledger.headers.open()
await self.ledger.network.start()
await self.ledger.network.on_connected.first
await self.ledger.initial_headers_sync()
self.assertEqual(initial_height + 100, self.ledger.local_height_including_downloaded_height)
await self.ledger.headers.open()
await self.ledger.network.start()
await self.ledger.network.on_connected.first
await self.ledger.initial_headers_sync()
self.assertEqual(initial_height + 100, self.ledger.local_height_including_downloaded_height)
async def test_connection_drop_still_receives_events_after_reconnected(self):
address1 = await self.account.receiving.get_or_create_usable_address()

View file

@ -337,7 +337,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
blocks = self.ledger.headers.height - start
self.blockchain.block_expected = start - 1
# go back to start
await self.blockchain.invalidate_block(self.ledger.headers.hash(start).decode())
await self.blockchain.invalidate_block((await self.ledger.headers.hash(start)).decode())
# go to previous + 1
await self.generate(blocks + 2)

View file

@ -27,6 +27,7 @@ class SyncTests(IntegrationTestCase):
wallet_node = WalletNode(WalletManager, RegTestLedger, port=self.api_port)
await wallet_node.start(self.conductor.spv_node, seed)
self.started_nodes.append(wallet_node)
await wallet_node.ledger.on_ready.first
return wallet_node
async def test_nodes_with_same_account_stay_in_sync(self):

View file

@ -92,7 +92,7 @@ class TestCostEst(unittest.TestCase):
@unittest.SkipTest
class TestJsonRpc(unittest.TestCase):
def setUp(self):
def noop():
async def noop():
return None
test_utils.reset_time(self)