From af0e9368d431e85a5b99c02d6c4ede3464e2db94 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 16 Mar 2020 11:28:11 -0300 Subject: [PATCH] headers get now async --- lbry/extras/daemon/json_response_encoder.py | 7 ++++--- lbry/stream/stream_manager.py | 2 +- lbry/wallet/header.py | 17 ++++++++++++----- lbry/wallet/ledger.py | 4 ++-- .../blockchain/test_resolve_command.py | 4 ++-- tests/unit/wallet/test_headers.py | 16 ++++++++-------- 6 files changed, 29 insertions(+), 21 deletions(-) diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index 997e2019e..92a43092c 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -159,6 +159,7 @@ class JSONResponseEncoder(JSONEncoder): return tx_height = txo.tx_ref.height best_height = self.ledger.headers.height + timestamp = self.ledger.headers.synchronous_get(tx_height)['timestamp'] if 0 < tx_height <= best_height else None output = { 'txid': txo.tx_ref.id, 'nout': txo.position, @@ -166,7 +167,7 @@ class JSONResponseEncoder(JSONEncoder): 'amount': dewies_to_lbc(txo.amount), 'address': txo.get_address(self.ledger) if txo.has_address else None, 'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height, - 'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None + 'timestamp': timestamp } if txo.is_spent is not None: output['is_spent'] = txo.is_spent @@ -244,7 +245,7 @@ class JSONResponseEncoder(JSONEncoder): if isinstance(value, int): meta[key] = dewies_to_lbc(value) if 0 < meta.get('creation_height', 0) <= self.ledger.headers.height: - meta['creation_timestamp'] = self.ledger.headers[meta['creation_height']]['timestamp'] + meta['creation_timestamp'] = self.ledger.headers.synchronous_get(meta['creation_height'])['timestamp'] return meta def encode_input(self, txi): @@ -306,7 +307,7 @@ class JSONResponseEncoder(JSONEncoder): 'added_on': managed_stream.added_on, 'height': tx_height, 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, - 'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None, + 'timestamp': self.ledger.headers.synchronous_get(tx_height)['timestamp'] if 0 < tx_height <= best_height else None, 'is_fully_reflected': managed_stream.is_fully_reflected } diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 92983b368..891af7ddb 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -338,7 +338,7 @@ class StreamManager: 'claim_sequence': -1, 'address': txo.get_address(wallet_manager.ledger), 'valid_at_height': txo.meta.get('activation_height', None), - 'timestamp': wallet_manager.ledger.headers[tx_height]['timestamp'], + 'timestamp': wallet_manager.ledger.headers.synchronous_get(tx_height)['timestamp'], 'supports': [] } else: diff --git a/lbry/wallet/header.py b/lbry/wallet/header.py index 43c7b7743..2adbbff02 100644 --- a/lbry/wallet/header.py +++ b/lbry/wallet/header.py @@ -104,7 +104,14 @@ class Headers: def __bool__(self): return True - def __getitem__(self, height) -> dict: + async def get(self, height) -> dict: + if isinstance(height, slice): + raise NotImplementedError("Slicing of header chain has not been implemented yet.") + if not 0 <= height <= self.height: + raise IndexError(f"{height} is out of bounds, current height: {self.height}") + return self.deserialize(height, self.get_raw_header(height)) + + def synchronous_get(self, height): if isinstance(height, slice): raise NotImplementedError("Slicing of header chain has not been implemented yet.") if not 0 <= height <= self.height: @@ -167,7 +174,7 @@ class Headers: for height, chunk in self._iterate_chunks(start, headers): try: # validate_chunk() is CPU bound and reads previous chunks from file system - self.validate_chunk(height, chunk) + await self.validate_chunk(height, chunk) except InvalidHeader as e: bail = True chunk = chunk[:(height-e.height)*self.header_size] @@ -186,13 +193,13 @@ class Headers: self._size = self.io.tell() // self.header_size return written - def validate_chunk(self, height, chunk): + async def validate_chunk(self, height, chunk): previous_hash, previous_header, previous_previous_header = None, None, None if height > 0: - previous_header = self[height-1] + previous_header = await self.get(height-1) previous_hash = self.hash(height-1) if height > 1: - previous_previous_header = self[height-2] + previous_previous_header = await self.get(height-2) chunk_target = self.get_next_chunk_target(height // 2016 - 1) for current_hash, current_header in self._iterate_headers(height, chunk): block_target = self.get_next_block_target(chunk_target, previous_previous_header, previous_header) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index b6870d636..abe407453 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -601,7 +601,7 @@ class Ledger(metaclass=LedgerRegistry): if 0 < remote_height < len(self.headers): merkle = await self.network.retriable_call(self.network.get_merkle, tx.id, remote_height) merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - header = self.headers[remote_height] + header = await self.headers.get(remote_height) tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] @@ -899,7 +899,7 @@ class Ledger(metaclass=LedgerRegistry): headers = self.headers history = [] for tx in txs: # pylint: disable=too-many-nested-blocks - ts = headers[tx.height]['timestamp'] if tx.height > 0 else None + ts = headers.synchronous_get(tx.height)['timestamp'] if tx.height > 0 else None item = { 'txid': tx.id, 'timestamp': ts, diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index 2681e3826..2a9a1fe08 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -49,11 +49,11 @@ class ResolveCommand(BaseResolveTestCase): self.assertTrue(claim['is_channel_signature_valid']) self.assertEqual( claim['timestamp'], - self.ledger.headers[claim['height']]['timestamp'] + self.ledger.headers.synchronous_get(claim['height'])['timestamp'] ) self.assertEqual( claim['signing_channel']['timestamp'], - self.ledger.headers[claim['signing_channel']['height']]['timestamp'] + self.ledger.headers.synchronous_get(claim['signing_channel']['height'])['timestamp'] ) # resolving claim foo by itself diff --git a/tests/unit/wallet/test_headers.py b/tests/unit/wallet/test_headers.py index 456529e75..8499a2b99 100644 --- a/tests/unit/wallet/test_headers.py +++ b/tests/unit/wallet/test_headers.py @@ -15,11 +15,11 @@ def block_bytes(blocks): class TestHeaders(AsyncioTestCase): - def test_deserialize(self): + async def test_deserialize(self): self.maxDiff = None h = Headers(':memory:') h.io.write(HEADERS) - self.assertEqual(h[0], { + self.assertEqual(await h.get(0), { 'bits': 520159231, 'block_height': 0, 'claim_trie_root': b'0000000000000000000000000000000000000000000000000000000000000001', @@ -29,7 +29,7 @@ class TestHeaders(AsyncioTestCase): 'timestamp': 1446058291, 'version': 1 }) - self.assertEqual(h[10], { + self.assertEqual(await h.get(10), { 'bits': 509349720, 'block_height': 10, 'merkle_root': b'f4d8fded6a181d4a8a2817a0eb423cc0f414af29490004a620e66c35c498a554', @@ -112,11 +112,11 @@ class TestHeaders(AsyncioTestCase): await headers.connect(0, HEADERS) self.assertEqual(19, headers.height) with self.assertRaises(IndexError): - _ = headers[3001] + _ = await headers.get(3001) with self.assertRaises(IndexError): - _ = headers[-1] - self.assertIsNotNone(headers[19]) - self.assertIsNotNone(headers[0]) + _ = await headers.get(-1) + self.assertIsNotNone(await headers.get(19)) + self.assertIsNotNone(await headers.get(0)) async def test_repair(self): headers = Headers(':memory:') @@ -166,7 +166,7 @@ class TestHeaders(AsyncioTestCase): for block_index in range(BLOCKS): while len(headers) < block_index: await asyncio.sleep(0.000001) - assert headers[block_index]['block_height'] == block_index + assert (await headers.get(block_index))['block_height'] == block_index reader_task = asyncio.create_task(reader()) await writer() await reader_task