forked from LBRYCommunity/lbry-sdk
headers get now async
This commit is contained in:
parent
285483b81a
commit
af0e9368d4
6 changed files with 29 additions and 21 deletions
|
@ -159,6 +159,7 @@ class JSONResponseEncoder(JSONEncoder):
|
|||
return
|
||||
tx_height = txo.tx_ref.height
|
||||
best_height = self.ledger.headers.height
|
||||
timestamp = self.ledger.headers.synchronous_get(tx_height)['timestamp'] if 0 < tx_height <= best_height else None
|
||||
output = {
|
||||
'txid': txo.tx_ref.id,
|
||||
'nout': txo.position,
|
||||
|
@ -166,7 +167,7 @@ class JSONResponseEncoder(JSONEncoder):
|
|||
'amount': dewies_to_lbc(txo.amount),
|
||||
'address': txo.get_address(self.ledger) if txo.has_address else None,
|
||||
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height,
|
||||
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None
|
||||
'timestamp': timestamp
|
||||
}
|
||||
if txo.is_spent is not None:
|
||||
output['is_spent'] = txo.is_spent
|
||||
|
@ -244,7 +245,7 @@ class JSONResponseEncoder(JSONEncoder):
|
|||
if isinstance(value, int):
|
||||
meta[key] = dewies_to_lbc(value)
|
||||
if 0 < meta.get('creation_height', 0) <= self.ledger.headers.height:
|
||||
meta['creation_timestamp'] = self.ledger.headers[meta['creation_height']]['timestamp']
|
||||
meta['creation_timestamp'] = self.ledger.headers.synchronous_get(meta['creation_height'])['timestamp']
|
||||
return meta
|
||||
|
||||
def encode_input(self, txi):
|
||||
|
@ -306,7 +307,7 @@ class JSONResponseEncoder(JSONEncoder):
|
|||
'added_on': managed_stream.added_on,
|
||||
'height': tx_height,
|
||||
'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
|
||||
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None,
|
||||
'timestamp': self.ledger.headers.synchronous_get(tx_height)['timestamp'] if 0 < tx_height <= best_height else None,
|
||||
'is_fully_reflected': managed_stream.is_fully_reflected
|
||||
}
|
||||
|
||||
|
|
|
@ -338,7 +338,7 @@ class StreamManager:
|
|||
'claim_sequence': -1,
|
||||
'address': txo.get_address(wallet_manager.ledger),
|
||||
'valid_at_height': txo.meta.get('activation_height', None),
|
||||
'timestamp': wallet_manager.ledger.headers[tx_height]['timestamp'],
|
||||
'timestamp': wallet_manager.ledger.headers.synchronous_get(tx_height)['timestamp'],
|
||||
'supports': []
|
||||
}
|
||||
else:
|
||||
|
|
|
@ -104,7 +104,14 @@ class Headers:
|
|||
def __bool__(self):
|
||||
return True
|
||||
|
||||
def __getitem__(self, height) -> dict:
|
||||
async def get(self, height) -> dict:
|
||||
if isinstance(height, slice):
|
||||
raise NotImplementedError("Slicing of header chain has not been implemented yet.")
|
||||
if not 0 <= height <= self.height:
|
||||
raise IndexError(f"{height} is out of bounds, current height: {self.height}")
|
||||
return self.deserialize(height, self.get_raw_header(height))
|
||||
|
||||
def synchronous_get(self, height):
|
||||
if isinstance(height, slice):
|
||||
raise NotImplementedError("Slicing of header chain has not been implemented yet.")
|
||||
if not 0 <= height <= self.height:
|
||||
|
@ -167,7 +174,7 @@ class Headers:
|
|||
for height, chunk in self._iterate_chunks(start, headers):
|
||||
try:
|
||||
# validate_chunk() is CPU bound and reads previous chunks from file system
|
||||
self.validate_chunk(height, chunk)
|
||||
await self.validate_chunk(height, chunk)
|
||||
except InvalidHeader as e:
|
||||
bail = True
|
||||
chunk = chunk[:(height-e.height)*self.header_size]
|
||||
|
@ -186,13 +193,13 @@ class Headers:
|
|||
self._size = self.io.tell() // self.header_size
|
||||
return written
|
||||
|
||||
def validate_chunk(self, height, chunk):
|
||||
async def validate_chunk(self, height, chunk):
|
||||
previous_hash, previous_header, previous_previous_header = None, None, None
|
||||
if height > 0:
|
||||
previous_header = self[height-1]
|
||||
previous_header = await self.get(height-1)
|
||||
previous_hash = self.hash(height-1)
|
||||
if height > 1:
|
||||
previous_previous_header = self[height-2]
|
||||
previous_previous_header = await self.get(height-2)
|
||||
chunk_target = self.get_next_chunk_target(height // 2016 - 1)
|
||||
for current_hash, current_header in self._iterate_headers(height, chunk):
|
||||
block_target = self.get_next_block_target(chunk_target, previous_previous_header, previous_header)
|
||||
|
|
|
@ -601,7 +601,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
if 0 < remote_height < len(self.headers):
|
||||
merkle = await self.network.retriable_call(self.network.get_merkle, tx.id, remote_height)
|
||||
merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash)
|
||||
header = self.headers[remote_height]
|
||||
header = await self.headers.get(remote_height)
|
||||
tx.position = merkle['pos']
|
||||
tx.is_verified = merkle_root == header['merkle_root']
|
||||
|
||||
|
@ -899,7 +899,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
headers = self.headers
|
||||
history = []
|
||||
for tx in txs: # pylint: disable=too-many-nested-blocks
|
||||
ts = headers[tx.height]['timestamp'] if tx.height > 0 else None
|
||||
ts = headers.synchronous_get(tx.height)['timestamp'] if tx.height > 0 else None
|
||||
item = {
|
||||
'txid': tx.id,
|
||||
'timestamp': ts,
|
||||
|
|
|
@ -49,11 +49,11 @@ class ResolveCommand(BaseResolveTestCase):
|
|||
self.assertTrue(claim['is_channel_signature_valid'])
|
||||
self.assertEqual(
|
||||
claim['timestamp'],
|
||||
self.ledger.headers[claim['height']]['timestamp']
|
||||
self.ledger.headers.synchronous_get(claim['height'])['timestamp']
|
||||
)
|
||||
self.assertEqual(
|
||||
claim['signing_channel']['timestamp'],
|
||||
self.ledger.headers[claim['signing_channel']['height']]['timestamp']
|
||||
self.ledger.headers.synchronous_get(claim['signing_channel']['height'])['timestamp']
|
||||
)
|
||||
|
||||
# resolving claim foo by itself
|
||||
|
|
|
@ -15,11 +15,11 @@ def block_bytes(blocks):
|
|||
|
||||
class TestHeaders(AsyncioTestCase):
|
||||
|
||||
def test_deserialize(self):
|
||||
async def test_deserialize(self):
|
||||
self.maxDiff = None
|
||||
h = Headers(':memory:')
|
||||
h.io.write(HEADERS)
|
||||
self.assertEqual(h[0], {
|
||||
self.assertEqual(await h.get(0), {
|
||||
'bits': 520159231,
|
||||
'block_height': 0,
|
||||
'claim_trie_root': b'0000000000000000000000000000000000000000000000000000000000000001',
|
||||
|
@ -29,7 +29,7 @@ class TestHeaders(AsyncioTestCase):
|
|||
'timestamp': 1446058291,
|
||||
'version': 1
|
||||
})
|
||||
self.assertEqual(h[10], {
|
||||
self.assertEqual(await h.get(10), {
|
||||
'bits': 509349720,
|
||||
'block_height': 10,
|
||||
'merkle_root': b'f4d8fded6a181d4a8a2817a0eb423cc0f414af29490004a620e66c35c498a554',
|
||||
|
@ -112,11 +112,11 @@ class TestHeaders(AsyncioTestCase):
|
|||
await headers.connect(0, HEADERS)
|
||||
self.assertEqual(19, headers.height)
|
||||
with self.assertRaises(IndexError):
|
||||
_ = headers[3001]
|
||||
_ = await headers.get(3001)
|
||||
with self.assertRaises(IndexError):
|
||||
_ = headers[-1]
|
||||
self.assertIsNotNone(headers[19])
|
||||
self.assertIsNotNone(headers[0])
|
||||
_ = await headers.get(-1)
|
||||
self.assertIsNotNone(await headers.get(19))
|
||||
self.assertIsNotNone(await headers.get(0))
|
||||
|
||||
async def test_repair(self):
|
||||
headers = Headers(':memory:')
|
||||
|
@ -166,7 +166,7 @@ class TestHeaders(AsyncioTestCase):
|
|||
for block_index in range(BLOCKS):
|
||||
while len(headers) < block_index:
|
||||
await asyncio.sleep(0.000001)
|
||||
assert headers[block_index]['block_height'] == block_index
|
||||
assert (await headers.get(block_index))['block_height'] == block_index
|
||||
reader_task = asyncio.create_task(reader())
|
||||
await writer()
|
||||
await reader_task
|
||||
|
|
Loading…
Reference in a new issue