forked from LBRYCommunity/lbry-sdk
Merge pull request #2885 from lbryio/repair_tip_on_open
repair headers tip on open and let new headers come in during background fetching
This commit is contained in:
commit
c8f431447c
3 changed files with 45 additions and 9 deletions
|
@ -59,7 +59,15 @@ class Headers:
|
||||||
self.io = open(self.path, 'w+b')
|
self.io = open(self.path, 'w+b')
|
||||||
else:
|
else:
|
||||||
self.io = open(self.path, 'r+b')
|
self.io = open(self.path, 'r+b')
|
||||||
self._size = self.io.seek(0, os.SEEK_END) // self.header_size
|
bytes_size = self.io.seek(0, os.SEEK_END)
|
||||||
|
self._size = bytes_size // self.header_size
|
||||||
|
max_checkpointed_height = max(self.checkpoints.keys() or [-1]) + 1000
|
||||||
|
if bytes_size % self.header_size:
|
||||||
|
log.warning("Reader file size doesnt match header size. Repairing, might take a while.")
|
||||||
|
await self.repair()
|
||||||
|
else:
|
||||||
|
# try repairing any incomplete write on tip from previous runs (outside of checkpoints, that are ok)
|
||||||
|
await self.repair(start_height=max_checkpointed_height)
|
||||||
await self.ensure_checkpointed_size()
|
await self.ensure_checkpointed_size()
|
||||||
await self.get_all_missing_headers()
|
await self.get_all_missing_headers()
|
||||||
|
|
||||||
|
@ -292,23 +300,26 @@ class Headers:
|
||||||
height, f"insufficient proof of work: {proof_of_work.value} vs target {target.value}"
|
height, f"insufficient proof of work: {proof_of_work.value} vs target {target.value}"
|
||||||
)
|
)
|
||||||
|
|
||||||
async def repair(self):
|
async def repair(self, start_height=0):
|
||||||
previous_header_hash = fail = None
|
previous_header_hash = fail = None
|
||||||
batch_size = 36
|
batch_size = 36
|
||||||
for start_height in range(0, self.height, batch_size):
|
for height in range(start_height, self.height, batch_size):
|
||||||
headers = await asyncio.get_running_loop().run_in_executor(
|
headers = await asyncio.get_running_loop().run_in_executor(
|
||||||
self.executor, self._read, start_height, batch_size
|
self.executor, self._read, height, batch_size
|
||||||
)
|
)
|
||||||
if len(headers) % self.header_size != 0:
|
if len(headers) % self.header_size != 0:
|
||||||
headers = headers[:(len(headers) // self.header_size) * self.header_size]
|
headers = headers[:(len(headers) // self.header_size) * self.header_size]
|
||||||
for header_hash, header in self._iterate_headers(start_height, headers):
|
for header_hash, header in self._iterate_headers(height, headers):
|
||||||
height = header['block_height']
|
height = header['block_height']
|
||||||
if height:
|
if previous_header_hash:
|
||||||
if header['prev_block_hash'] != previous_header_hash:
|
if header['prev_block_hash'] != previous_header_hash:
|
||||||
fail = True
|
fail = True
|
||||||
else:
|
elif height == 0:
|
||||||
if header_hash != self.genesis_hash:
|
if header_hash != self.genesis_hash:
|
||||||
fail = True
|
fail = True
|
||||||
|
else:
|
||||||
|
# for sanity and clarity, since it is the only way we can end up here
|
||||||
|
assert start_height > 0 and height == start_height
|
||||||
if fail:
|
if fail:
|
||||||
log.warning("Header file corrupted at height %s, truncating it.", height - 1)
|
log.warning("Header file corrupted at height %s, truncating it.", height - 1)
|
||||||
def __truncate(at_height):
|
def __truncate(at_height):
|
||||||
|
|
|
@ -354,8 +354,8 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self.headers.chunk_getter = get_chunk
|
self.headers.chunk_getter = get_chunk
|
||||||
|
|
||||||
async def doit():
|
async def doit():
|
||||||
async with self._header_processing_lock:
|
|
||||||
for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)):
|
for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)):
|
||||||
|
async with self._header_processing_lock:
|
||||||
await self.headers.ensure_chunk_at(height)
|
await self.headers.ensure_chunk_at(height)
|
||||||
self._other_tasks.add(doit())
|
self._other_tasks.add(doit())
|
||||||
await self.update_headers()
|
await self.update_headers()
|
||||||
|
|
|
@ -143,6 +143,31 @@ class TestHeaders(AsyncioTestCase):
|
||||||
self.assertEqual(7, headers.height)
|
self.assertEqual(7, headers.height)
|
||||||
await headers.connect(len(headers), HEADERS[block_bytes(8):])
|
await headers.connect(len(headers), HEADERS[block_bytes(8):])
|
||||||
self.assertEqual(19, headers.height)
|
self.assertEqual(19, headers.height)
|
||||||
|
# verify from middle
|
||||||
|
await headers.repair(start_height=10)
|
||||||
|
self.assertEqual(19, headers.height)
|
||||||
|
|
||||||
|
async def test_misalignment_triggers_repair_on_open(self):
|
||||||
|
headers = Headers(':memory:')
|
||||||
|
headers.io.seek(0)
|
||||||
|
headers.io.write(HEADERS)
|
||||||
|
with self.assertLogs(level='WARN') as cm:
|
||||||
|
await headers.open()
|
||||||
|
self.assertEqual(cm.output, [])
|
||||||
|
headers.io.seek(0)
|
||||||
|
headers.io.truncate()
|
||||||
|
headers.io.write(HEADERS[:block_bytes(10)])
|
||||||
|
headers.io.write(b'ops')
|
||||||
|
headers.io.write(HEADERS[block_bytes(10):])
|
||||||
|
await headers.open()
|
||||||
|
self.assertEqual(
|
||||||
|
cm.output, [
|
||||||
|
'WARNING:lbry.wallet.header:Reader file size doesnt match header size. '
|
||||||
|
'Repairing, might take a while.',
|
||||||
|
'WARNING:lbry.wallet.header:Header file corrupted at height 9, truncating '
|
||||||
|
'it.'
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
async def test_concurrency(self):
|
async def test_concurrency(self):
|
||||||
BLOCKS = 19
|
BLOCKS = 19
|
||||||
|
|
Loading…
Reference in a new issue