forked from LBRYCommunity/lbry-sdk
hold headers file in memory during runtime
This commit is contained in:
parent
6a991e5c15
commit
ced368db31
2 changed files with 33 additions and 42 deletions
|
@ -42,23 +42,19 @@ class Headers:
|
|||
validate_difficulty: bool = True
|
||||
|
||||
def __init__(self, path) -> None:
|
||||
if path == ':memory:':
|
||||
self.io = BytesIO()
|
||||
self.io = BytesIO()
|
||||
self.path = path
|
||||
self._size: Optional[int] = None
|
||||
self.chunk_getter: Optional[Callable] = None
|
||||
self.executor = ThreadPoolExecutor(1)
|
||||
self.known_missing_checkpointed_chunks = set()
|
||||
self.check_chunk_lock = asyncio.Lock()
|
||||
|
||||
async def open(self):
|
||||
if not self.executor:
|
||||
self.executor = ThreadPoolExecutor(1)
|
||||
if self.path != ':memory:':
|
||||
if not os.path.exists(self.path):
|
||||
self.io = open(self.path, 'w+b')
|
||||
else:
|
||||
self.io = open(self.path, 'r+b')
|
||||
if os.path.exists(self.path):
|
||||
with open(self.path, 'r+b') as header_file:
|
||||
self.io.seek(0)
|
||||
self.io.write(header_file.read())
|
||||
bytes_size = self.io.seek(0, os.SEEK_END)
|
||||
self._size = bytes_size // self.header_size
|
||||
max_checkpointed_height = max(self.checkpoints.keys() or [-1]) + 1000
|
||||
|
@ -72,10 +68,12 @@ class Headers:
|
|||
await self.get_all_missing_headers()
|
||||
|
||||
async def close(self):
|
||||
if self.executor:
|
||||
self.executor.shutdown()
|
||||
self.executor = None
|
||||
self.io.close()
|
||||
if self.io is not None:
|
||||
flags = 'r+b' if os.path.exists(self.path) else 'w+b'
|
||||
with open(self.path, flags) as header_file:
|
||||
header_file.write(self.io.getbuffer())
|
||||
self.io.close()
|
||||
self.io = None
|
||||
|
||||
@staticmethod
|
||||
def serialize(header):
|
||||
|
@ -148,15 +146,14 @@ class Headers:
|
|||
await self.ensure_chunk_at(height)
|
||||
if not 0 <= height <= self.height:
|
||||
raise IndexError(f"{height} is out of bounds, current height: {self.height}")
|
||||
return await asyncio.get_running_loop().run_in_executor(self.executor, self._read, height)
|
||||
return self._read(height)
|
||||
|
||||
def _read(self, height, count=1):
|
||||
self.io.seek(height * self.header_size, os.SEEK_SET)
|
||||
return self.io.read(self.header_size * count)
|
||||
offset = height * self.header_size
|
||||
return bytes(self.io.getbuffer()[offset: offset + self.header_size * count])
|
||||
|
||||
def chunk_hash(self, start, count):
|
||||
self.io.seek(start * self.header_size, os.SEEK_SET)
|
||||
return self.hash_header(self.io.read(count * self.header_size)).decode()
|
||||
return self.hash_header(self._read(start, count)).decode()
|
||||
|
||||
async def ensure_checkpointed_size(self):
|
||||
max_checkpointed_height = max(self.checkpoints.keys() or [-1])
|
||||
|
@ -179,7 +176,7 @@ class Headers:
|
|||
)
|
||||
chunk_hash = self.hash_header(chunk).decode()
|
||||
if self.checkpoints.get(start) == chunk_hash:
|
||||
await asyncio.get_running_loop().run_in_executor(self.executor, self._write, start, chunk)
|
||||
self._write(start, chunk)
|
||||
if start in self.known_missing_checkpointed_chunks:
|
||||
self.known_missing_checkpointed_chunks.remove(start)
|
||||
return
|
||||
|
@ -194,22 +191,18 @@ class Headers:
|
|||
if normalized_height in self.checkpoints:
|
||||
return normalized_height not in self.known_missing_checkpointed_chunks
|
||||
|
||||
def _has_header(height):
|
||||
empty = '56944c5d3f98413ef45cf54545538103cc9f298e0575820ad3591376e2e0f65d'
|
||||
all_zeroes = '789d737d4f448e554b318c94063bbfa63e9ccda6e208f5648ca76ee68896557b'
|
||||
return self.chunk_hash(height, 1) not in (empty, all_zeroes)
|
||||
return await asyncio.get_running_loop().run_in_executor(self.executor, _has_header, height)
|
||||
empty = '56944c5d3f98413ef45cf54545538103cc9f298e0575820ad3591376e2e0f65d'
|
||||
all_zeroes = '789d737d4f448e554b318c94063bbfa63e9ccda6e208f5648ca76ee68896557b'
|
||||
return self.chunk_hash(height, 1) not in (empty, all_zeroes)
|
||||
|
||||
async def get_all_missing_headers(self):
|
||||
# Heavy operation done in one optimized shot
|
||||
def _io_checkall():
|
||||
for chunk_height, expected_hash in reversed(list(self.checkpoints.items())):
|
||||
if chunk_height in self.known_missing_checkpointed_chunks:
|
||||
continue
|
||||
if self.chunk_hash(chunk_height, 1000) != expected_hash:
|
||||
self.known_missing_checkpointed_chunks.add(chunk_height)
|
||||
return self.known_missing_checkpointed_chunks
|
||||
return await asyncio.get_running_loop().run_in_executor(self.executor, _io_checkall)
|
||||
for chunk_height, expected_hash in reversed(list(self.checkpoints.items())):
|
||||
if chunk_height in self.known_missing_checkpointed_chunks:
|
||||
continue
|
||||
if self.chunk_hash(chunk_height, 1000) != expected_hash:
|
||||
self.known_missing_checkpointed_chunks.add(chunk_height)
|
||||
return self.known_missing_checkpointed_chunks
|
||||
|
||||
@property
|
||||
def height(self) -> int:
|
||||
|
@ -241,7 +234,7 @@ class Headers:
|
|||
bail = True
|
||||
chunk = chunk[:(height-e.height)*self.header_size]
|
||||
if chunk:
|
||||
added += await asyncio.get_running_loop().run_in_executor(self.executor, self._write, height, chunk)
|
||||
added += self._write(height, chunk)
|
||||
if bail:
|
||||
break
|
||||
return added
|
||||
|
@ -306,9 +299,7 @@ class Headers:
|
|||
previous_header_hash = fail = None
|
||||
batch_size = 36
|
||||
for height in range(start_height, self.height, batch_size):
|
||||
headers = await asyncio.get_running_loop().run_in_executor(
|
||||
self.executor, self._read, height, batch_size
|
||||
)
|
||||
headers = self._read(height, batch_size)
|
||||
if len(headers) % self.header_size != 0:
|
||||
headers = headers[:(len(headers) // self.header_size) * self.header_size]
|
||||
for header_hash, header in self._iterate_headers(height, headers):
|
||||
|
@ -324,12 +315,11 @@ class Headers:
|
|||
assert start_height > 0 and height == start_height
|
||||
if fail:
|
||||
log.warning("Header file corrupted at height %s, truncating it.", height - 1)
|
||||
def __truncate(at_height):
|
||||
self.io.seek(max(0, (at_height - 1)) * self.header_size, os.SEEK_SET)
|
||||
self.io.truncate()
|
||||
self.io.flush()
|
||||
self._size = self.io.seek(0, os.SEEK_END) // self.header_size
|
||||
return await asyncio.get_running_loop().run_in_executor(self.executor, __truncate, height)
|
||||
self.io.seek(max(0, (height - 1)) * self.header_size, os.SEEK_SET)
|
||||
self.io.truncate()
|
||||
self.io.flush()
|
||||
self._size = self.io.seek(0, os.SEEK_END) // self.header_size
|
||||
return
|
||||
previous_header_hash = header_hash
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -192,6 +192,7 @@ class TestHeaders(AsyncioTestCase):
|
|||
reader_task = asyncio.create_task(reader())
|
||||
await writer()
|
||||
await reader_task
|
||||
await headers.close()
|
||||
|
||||
|
||||
HEADERS = unhexlify(
|
||||
|
|
Loading…
Reference in a new issue